You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/16 21:37:43 UTC
[4/7] airavata git commit: Removed gsi related code
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
deleted file mode 100644
index 413b5dc..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.provider.impl;
-
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.provider.AbstractProvider;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.JobDescriptor;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
-import org.apache.airavata.model.experiment.CorrectiveAction;
-import org.apache.airavata.model.experiment.ErrorCategory;
-import org.apache.airavata.model.experiment.JobDetails;
-import org.apache.airavata.model.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Map;
-
-//import org.apache.airavata.schemas.gfac.GsisshHostType;
-
-public class GSISSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
- try {
- String hostAddress = jobExecutionContext.getHostName();
- if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- } catch (GFacException e) {
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- log.info("Invoking GSISSH Provider Invoke ...");
- StringBuffer data = new StringBuffer();
- ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
- .getComputeResourceDescription();
- ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription();
- JobDetails jobDetails = new JobDetails();
- RemoteCluster remoteCluster = null;
-
- try {
- if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) {
- remoteCluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getRemoteCluster();
- }
- if (remoteCluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, remoteCluster);
- jobDetails.setJobName(jobDescriptor.getJobName());
-
- log.info(jobDescriptor.toXML());
- data.append("jobDesc=").append(jobDescriptor.toXML());
- jobDetails.setJobDescription(jobDescriptor.toXML());
- String jobID = remoteCluster.submitBatchJob(jobDescriptor);
- jobExecutionContext.setJobDetails(jobDetails);
- if (jobID == null) {
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- } else {
- jobDetails.setJobID(jobID.split("\\.")[0]);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- }
- data.append(",jobId=").append(jobDetails.getJobID());
-
- // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
- // to perform monitoring, daemon handlers can be accessed from anywhere
- monitor(jobExecutionContext);
- // we know this host is type GsiSSHHostType
- } catch (Exception e) {
- String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } finally {
- log.info("Saving data for future recovery: ");
- log.info(data.toString());
- GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName());
- }
-
- }
-
- public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
-/* List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
- jobExecutionContext.setProperty("cancel","true");
- pullMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
- " to handle by the GridPullMonitorHandler");
- }
- } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pushMonitorHandler = threadedHandler;
- if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
- pushMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
- " to handle by the GridPushMonitorHandler");
- }
- }
- // have to handle the GridPushMonitorHandler logic
- }
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
- }*/
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- log.info("canceling the job status in GSISSHProvider!!!!!");
- JobDetails jobDetails = jobExecutionContext.getJobDetails();
- String hostName = jobExecutionContext.getHostName();
- try {
- RemoteCluster remoteCluster = null;
- if (jobExecutionContext.getSecurityContext(hostName) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- remoteCluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostName)).getRemoteCluster();
- if (remoteCluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- if(jobDetails == null) {
- log.error("There is not JobDetails so cancelations cannot perform !!!");
- return false;
- }
- if (jobDetails.getJobID() != null) {
- // if this operation success without any exceptions, we can assume cancel operation succeeded.
- remoteCluster.cancelJob(jobDetails.getJobID());
- } else {
- log.error("No Job Id is set, so cannot perform the cancel operation !!!");
- return false;
- }
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
- return true;
- // we know this host is type GsiSSHHostType
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- }
- }
-
- public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
- // have to implement the logic to recover a gfac failure
- log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
- ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
- .getComputeResourceDescription();
- String hostName = jobExecutionContext.getHostName();
- String jobId = "";
- String jobDesc = "";
- try {
- String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
- String[] split = pluginData.split(",");
- if (split.length < 2) {
- try {
- this.execute(jobExecutionContext);
- } catch (GFacException e) {
- log.error("Error while recovering provider", e);
- throw new GFacProviderException("Error recovering provider", e);
- }
- return;
- }
- jobDesc = split[0].substring(7);
- jobId = split[1].substring(6);
-
- log.info("Following data have recovered: ");
- log.info("Job Description: " + jobDesc);
- log.info("Job Id: " + jobId);
- if (jobId == null || "none".equals(jobId) ||
- "".equals(jobId)) {
- try {
- this.execute(jobExecutionContext);
- } catch (GFacException e) {
- log.error("Error while recovering provider", e);
- throw new GFacProviderException("Error recovering provider", e);
- }
- return;
- }
- } catch (Exception e) {
- log.error("Error while recovering provider", e);
- }
- try {
- // Now we are we have enough data to recover
- JobDetails jobDetails = new JobDetails();
- jobDetails.setJobDescription(jobDesc);
- jobDetails.setJobID(jobId);
- jobExecutionContext.setJobDetails(jobDetails);
- if (jobExecutionContext.getSecurityContext(hostName) == null) {
- try {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- monitor(jobExecutionContext);
- } catch (Exception e) {
- log.error("Error while recover the job", e);
- throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
- }
- }
-
- @Override
- public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
- SSHJobSubmission sshJobSubmission = null;
- try {
- sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
- } catch (AppCatalogException e) {
- throw new GFacException("Error while reading compute resource", e);
- }
- if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
- try {
- EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
- sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
- emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
- } catch (AiravataException e) {
- throw new GFacHandlerException("Error while activating email job monitoring ", e);
- }
- return;
- }
- }
-/*
- // if email monitor is not activeated or not configure we use pull or push monitor
- List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- String jobID = jobExecutionContext.getJobDetails().getJobID();
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
- pullMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
- " to handle by the GridPullMonitorHandler");
- }
- } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pushMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
- log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
- pushMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
- " to handle by the GridPushMonitorHandler");
- }
- }
- // have to handle the GridPushMonitorHandler logic
- }
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
-
- }*/
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
deleted file mode 100644
index 85216b4..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.security;
-
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.core.context.AbstractSecurityContext;
-import org.apache.airavata.gfac.core.RequestData;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles GRID related security.
- */
-public class GSISecurityContext extends AbstractSecurityContext {
-
- protected static final Logger log = LoggerFactory.getLogger(GSISecurityContext.class);
- /*
- * context name
- */
-
- private RemoteCluster remoteCluster = null;
-
-
- public GSISecurityContext(CredentialReader credentialReader, RequestData requestData, RemoteCluster remoteCluster) {
- super(credentialReader, requestData);
- this.remoteCluster = remoteCluster;
- }
-
-
- public GSISecurityContext(CredentialReader credentialReader, RequestData requestData) {
- super(credentialReader, requestData);
- }
-
-
- public GSISecurityContext(RemoteCluster remoteCluster) {
- this.setRemoteCluster(remoteCluster);
- }
-
-
-
- public RemoteCluster getRemoteCluster() {
- return remoteCluster;
- }
-
- public void setRemoteCluster(RemoteCluster remoteCluster) {
- this.remoteCluster = remoteCluster;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
deleted file mode 100644
index 36cb84f..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.security;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.credential.Credential;
-import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.core.GFacConstants;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.RequestData;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo;
-import org.globus.gsi.X509Credential;
-import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
-import org.globus.gsi.provider.GlobusProvider;
-import org.globus.myproxy.GetParams;
-import org.globus.myproxy.MyProxy;
-import org.globus.myproxy.MyProxyException;
-import org.gridforum.jgss.ExtendedGSSCredential;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.security.Security;
-import java.security.cert.X509Certificate;
-
-public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo {
- protected static final Logger log = LoggerFactory.getLogger(TokenizedMyProxyAuthInfo.class);
-
- public static int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90;
-
- private GSSCredential gssCredentials = null;
-
-
- private CredentialReader credentialReader;
-
- private RequestData requestData;
-
- public static final String X509_CERT_DIR = "X509_CERT_DIR";
-
-
- static {
- Security.addProvider(new GlobusProvider());
- try {
- setUpTrustedCertificatePath();
- } catch (ApplicationSettingsException e) {
- log.error(e.getLocalizedMessage(), e);
- }
- }
-
- public static void setUpTrustedCertificatePath(String trustedCertificatePath) {
-
- File file = new File(trustedCertificatePath);
-
- if (!file.exists() || !file.canRead()) {
- File f = new File(".");
- log.info("Current directory " + f.getAbsolutePath());
- throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath);
- } else {
- System.setProperty(GFacConstants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
- }
- }
-
- private static void setUpTrustedCertificatePath() throws ApplicationSettingsException {
-
- String trustedCertificatePath = ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION);
-
- setUpTrustedCertificatePath(trustedCertificatePath);
- }
-
- public TokenizedMyProxyAuthInfo(CredentialReader credentialReader, RequestData requestData) {
- this.credentialReader = credentialReader;
- this.requestData = requestData;
- try {
- properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION));
- } catch (ApplicationSettingsException e) {
- log.error("Error while reading server properties", e);
- };
- }
-
- public TokenizedMyProxyAuthInfo(RequestData requestData) {
- this.requestData = requestData;
- try {
- properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION));
- } catch (ApplicationSettingsException e) {
- log.error("Error while reading server properties", e);
- };
- }
-
- public GSSCredential getCredentials() throws SecurityException {
-
- if (gssCredentials == null) {
-
- try {
- gssCredentials = getCredentialsFromStore();
- } catch (Exception e) {
- log.error("An exception occurred while retrieving credentials from the credential store. " +
- "Will continue with my proxy user name and password. Provided TokenId:" + requestData.getTokenId(), e);
- }
-
- if (gssCredentials == null) {
- System.out.println("Authenticating with provided token failed, so falling back to authenticate with defaultCredentials");
- try {
- gssCredentials = getDefaultCredentials();
- } catch (Exception e) {
- throw new SecurityException("Error retrieving my proxy using username password");
- }
- }
- // if still null, throw an exception
- if (gssCredentials == null) {
- throw new SecurityException("Unable to retrieve my proxy credentials to continue operation.");
- }
- } else {
- try {
- if (gssCredentials.getRemainingLifetime() < CREDENTIAL_RENEWING_THRESH_HOLD) {
- try {
- return renewCredentials();
- } catch (Exception e) {
- throw new SecurityException("Error renewing credentials", e);
- }
- }
- } catch (GSSException e) {
- throw new SecurityException("Unable to retrieve remaining life time from credentials.", e);
- }
- }
-
- return gssCredentials;
- }
-
-
- /**
- * Reads the credentials from credential store.
- *
- * @return If token is found in the credential store, will return a valid credential. Else returns null.
- * @throws Exception If an error occurred while retrieving credentials.
- */
- public GSSCredential getCredentialsFromStore() throws Exception {
-
- if (getCredentialReader() == null) {
- credentialReader = GFacUtils.getCredentialReader();
- if(credentialReader == null){
- return null;
- }
- }
-
- Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(),
- getRequestData().getTokenId());
-
- if (credential != null) {
- if (credential instanceof CertificateCredential) {
-
- log.info("Successfully found credentials for token id - " + getRequestData().getTokenId() +
- " gateway id - " + getRequestData().getGatewayId());
-
- CertificateCredential certificateCredential = (CertificateCredential) credential;
-
- X509Certificate[] certificates = certificateCredential.getCertificates();
- X509Credential newCredential = new X509Credential(certificateCredential.getPrivateKey(), certificates);
-
- GlobusGSSCredentialImpl cred = new GlobusGSSCredentialImpl(newCredential, GSSCredential.INITIATE_AND_ACCEPT);
- System.out.print(cred.export(ExtendedGSSCredential.IMPEXP_OPAQUE));
- return cred;
- //return new GlobusGSSCredentialImpl(newCredential,
- // GSSCredential.INITIATE_AND_ACCEPT);
- } else {
- log.info("Credential type is not CertificateCredential. Cannot create mapping globus credentials. " +
- "Credential type - " + credential.getClass().getName());
- }
- } else {
- log.info("Could not find credentials for token - " + getRequestData().getTokenId() + " and "
- + "gateway id - " + getRequestData().getGatewayId());
- }
-
- return null;
- }
-
- /**
- * Renew GSSCredentials.
- * Before executing we need to add current host as a trusted renewer. Note to renew credentials
- * we dont need user name and password.
- * To do that execute following command
- * > myproxy-logon -t <LIFETIME></LIFETIME> -s <MY PROXY SERVER> -l <USER NAME>
- * E.g :- > myproxy-logon -t 264 -s myproxy.teragrid.org -l us3
- * Enter MyProxy pass phrase:
- * A credential has been received for user us3 in /tmp/x509up_u501.
- * > myproxy-init -A --cert /tmp/x509up_u501 --key /tmp/x509up_u501 -l ogce -s myproxy.teragrid.org
- *
- * @return Renewed credentials.
- * @throws GFacException If an error occurred while renewing credentials.
- * @throws org.apache.airavata.common.exception.ApplicationSettingsException
- */
- public GSSCredential renewCredentialsAsATrustedHost() throws GFacException, ApplicationSettingsException {
- MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
- GetParams getParams = new GetParams();
- getParams.setAuthzCreds(gssCredentials);
- getParams.setUserName(getRequestData().getMyProxyUserName());
- getParams.setLifetime(getRequestData().getMyProxyLifeTime());
- try {
- return myproxy.get(gssCredentials, getParams);
- } catch (MyProxyException e) {
- throw new GFacException("An error occurred while renewing security credentials.", e);
- }
- }
-
-
- /**
- * Gets the default proxy certificate.
- *
- * @return Default my proxy credentials.
- * @throws GFacException If an error occurred while retrieving credentials.
- * @throws org.apache.airavata.common.exception.ApplicationSettingsException
- */
- public GSSCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException {
- MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
- try {
- return myproxy.get(getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
- getRequestData().getMyProxyLifeTime());
- } catch (MyProxyException e) {
- throw new GFacException("An error occurred while retrieving default security credentials.", e);
- }
- }
-
-
- /**
- * Renews credentials. First try to renew credentials as a trusted renewer. If that failed
- * use user name and password to renew credentials.
- *
- * @return Renewed credentials.
- * @throws GFacException If an error occurred while renewing credentials.
- * @throws org.apache.airavata.common.exception.ApplicationSettingsException
- */
- public GSSCredential renewCredentials() throws GFacException, ApplicationSettingsException {
-
- // First try to renew credentials as a trusted renewer
- try {
- gssCredentials = renewCredentialsAsATrustedHost();
- } catch (Exception e) {
- log.warn("Renewing credentials as a trusted renewer failed", e);
- gssCredentials = getDefaultCredentials();
- }
-
- return gssCredentials;
- }
-
- /**
- * Gets a new proxy certificate given current credentials.
- *
- * @return The short lived GSSCredentials
- * @throws GFacException If an error is occurred while retrieving credentials.
- * @throws org.apache.airavata.common.exception.ApplicationSettingsException
- */
- public GSSCredential getProxyCredentials() throws GFacException, ApplicationSettingsException {
-
- MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
- try {
- return myproxy.get(gssCredentials, getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
- getRequestData().getMyProxyLifeTime());
- } catch (MyProxyException e) {
- throw new GFacException("An error occurred while renewing security credentials using user/password.", e);
- }
- }
-
- public void setGssCredentials(GSSCredential gssCredentials) {
- this.gssCredentials = gssCredentials;
- }
-
- public CredentialReader getCredentialReader() {
- return credentialReader;
- }
-
- public void setCredentialReader(CredentialReader credentialReader) {
- this.credentialReader = credentialReader;
- }
-
- public RequestData getRequestData() {
- return requestData;
- }
-
- public void setRequestData(RequestData requestData) {
- this.requestData = requestData;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
deleted file mode 100644
index 89e3571..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.util;
-
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.gsi.ssh.impl.HPCRemoteCluster;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.RequestData;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.gsissh.security.TokenizedMyProxyAuthInfo;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
-import org.apache.airavata.gfac.core.JobDescriptor;
-import org.apache.airavata.gfac.core.JobManagerConfiguration;
-import org.apache.airavata.gfac.gsi.ssh.impl.GSISSHAbstractCluster;
-import org.apache.airavata.gfac.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.*;
-import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.experiment.ComputationalResourceScheduling;
-import org.apache.airavata.model.experiment.TaskDetails;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.*;
-
-
-public class GFACGSISSHUtils {
- private final static Logger logger = LoggerFactory.getLogger(GFACGSISSHUtils.class);
-
- public static final String PBS_JOB_MANAGER = "pbs";
- public static final String SLURM_JOB_MANAGER = "slurm";
- public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
- public static final String LSF_JOB_MANAGER = "lsf";
-
- public static int maxClusterCount = 5;
- public static Map<String, List<RemoteCluster>> clusters = new HashMap<String, List<RemoteCluster>>();
-
- public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
- JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
- JobSubmissionProtocol jobProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
- try {
- AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
- SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
- if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == JobSubmissionProtocol.UNICORE
- || jobProtocol == JobSubmissionProtocol.CLOUD || jobProtocol == JobSubmissionProtocol.LOCAL) {
- logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
- } else if (jobProtocol == JobSubmissionProtocol.SSH && sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) {
- String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
- RequestData requestData = new RequestData(jobExecutionContext.getGatewayID());
- requestData.setTokenId(credentialStoreToken);
- HPCRemoteCluster HPCRemoteCluster = null;
- GSISecurityContext context = null;
-
- TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData);
- CredentialReader credentialReader = GFacUtils.getCredentialReader();
- if (credentialReader != null) {
- CertificateCredential credential = null;
- try {
- credential = (CertificateCredential) credentialReader.getCredential(jobExecutionContext.getGatewayID(), credentialStoreToken);
- requestData.setMyProxyUserName(credential.getCommunityUser().getUserName());
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
- }
-
- String key = requestData.getMyProxyUserName() + jobExecutionContext.getHostName()+
- sshJobSubmission.getSshPort();
- boolean recreate = false;
- synchronized (clusters) {
- if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
- recreate = true;
- } else if (clusters.containsKey(key)) {
- int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
- if (clusters.get(key).get(i).getSession().isConnected()) {
- HPCRemoteCluster = (HPCRemoteCluster) clusters.get(key).get(i);
- } else {
- clusters.get(key).remove(i);
- recreate = true;
- }
- if (!recreate) {
- try {
- HPCRemoteCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
- } catch (Exception e) {
- clusters.get(key).remove(i);
- logger.info("Connection found the connection map is expired, so we create from the scratch");
- maxClusterCount++;
- recreate = true; // we make the HPCRemoteCluster to create again if there is any exception druing connection
- }
- logger.info("Re-using the same connection used with the connection string:" + key);
- context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, HPCRemoteCluster);
- }
- } else {
- recreate = true;
- }
-
- if (recreate) {
- ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), jobExecutionContext.getHostName(),
- sshJobSubmission.getSshPort());
-
- JobManagerConfiguration jConfig = null;
- String installedParentPath = sshJobSubmission.getResourceJobManager().getJobManagerBinPath();
- String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
- if (jobManager == null) {
- logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
- jConfig = CommonUtils.getPBSJobManager(installedParentPath);
- } else {
- if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
- jConfig = CommonUtils.getPBSJobManager(installedParentPath);
- } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
- jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
- } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
- jConfig = CommonUtils.getUGEJobManager(installedParentPath);
- }else if(LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
- jConfig = CommonUtils.getLSFJobManager(installedParentPath);
- }
- }
- HPCRemoteCluster = new HPCRemoteCluster(serverInfo, tokenizedMyProxyAuthInfo, jConfig);
- context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, HPCRemoteCluster);
- List<RemoteCluster> pbsRemoteClusters = null;
- if (!(clusters.containsKey(key))) {
- pbsRemoteClusters = new ArrayList<RemoteCluster>();
- } else {
- pbsRemoteClusters = clusters.get(key);
- }
- pbsRemoteClusters.add(HPCRemoteCluster);
- clusters.put(key, pbsRemoteClusters);
- }
- }
-
- jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), context);
- }
- } catch (Exception e) {
- throw new GFacException("An error occurred while creating GSI security context", e);
- }
- }
-
- public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, RemoteCluster remoteCluster) {
- JobDescriptor jobDescriptor = new JobDescriptor();
- TaskDetails taskData = jobExecutionContext.getTaskData();
- ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
- try {
- if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")){
- jobDescriptor.setMailOptions(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS));
- String emailids = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
-
- if(jobExecutionContext.getTaskData().isSetEmailAddresses()){
- List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
- String elist = GFacUtils.listToCsv(emailList, ',');
- if(emailids != null && !emailids.isEmpty()){
- emailids = emailids +"," + elist;
- }else{
- emailids = elist;
- }
- }
- if(emailids != null && !emailids.isEmpty()){
- logger.info("Email list: "+ emailids);
- jobDescriptor.setMailAddress(emailids);
- }
- }
- } catch (ApplicationSettingsException e) {
- logger.error("ApplicationSettingsException : " +e.getLocalizedMessage());
- }
- // this is common for any application descriptor
- jobDescriptor.setCallBackIp(ServerSettings.getIp());
- jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
- jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir());
- jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir());
- jobDescriptor.setExecutablePath(jobExecutionContext.getExecutablePath());
- jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput());
- jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError());
- String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount();
- taskData.getEmailAddresses();
- if (computationalProjectAccount == null){
- ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference();
- if (computeResourcePreference != null) {
- computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber();
- }
- }
- if (computationalProjectAccount != null) {
- jobDescriptor.setAcountString(computationalProjectAccount);
- }
-
- Random random = new Random();
- int i = random.nextInt(Integer.MAX_VALUE); // We always set the job name
- jobDescriptor.setJobName("A" + String.valueOf(i+99999999));
- jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir());
-
- List<String> inputValues = new ArrayList<String>();
- MessageContext input = jobExecutionContext.getInMessageContext();
- // sort the inputs first and then build the command List
- Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
- @Override
- public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
- return inputDataObjectType.getInputOrder() - t1.getInputOrder();
- }
- };
- Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
- for (Object object : input.getParameters().values()) {
- if (object instanceof InputDataObjectType) {
- InputDataObjectType inputDOT = (InputDataObjectType) object;
- sortedInputSet.add(inputDOT);
- }
- }
- for (InputDataObjectType inputDataObjectType : sortedInputSet) {
- if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
- continue;
- }
- if (inputDataObjectType.getApplicationArgument() != null
- && !inputDataObjectType.getApplicationArgument().equals("")) {
- inputValues.add(inputDataObjectType.getApplicationArgument());
- }
-
- if (inputDataObjectType.getValue() != null
- && !inputDataObjectType.getValue().equals("")) {
- if (inputDataObjectType.getType() == DataType.URI) {
- // set only the relative path
- String filePath = inputDataObjectType.getValue();
- filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
- inputValues.add(filePath);
- }else {
- inputValues.add(inputDataObjectType.getValue());
- }
-
- }
- }
-
- Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters();
- for (Object outputParam : outputParams.values()) {
- if (outputParam instanceof OutputDataObjectType) {
- OutputDataObjectType output = (OutputDataObjectType) outputParam;
- if (output.getApplicationArgument() != null
- && !output.getApplicationArgument().equals("")) {
- inputValues.add(output.getApplicationArgument());
- }
- if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
- if (output.getType() == DataType.URI){
- String filePath = output.getValue();
- filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
- inputValues.add(filePath);
- }
- }
- }
- }
- jobDescriptor.setInputValues(inputValues);
-
- jobDescriptor.setUserName(((GSISSHAbstractCluster) remoteCluster).getServerInfo().getUserName());
- jobDescriptor.setShellName("/bin/bash");
- jobDescriptor.setAllEnvExport(true);
- jobDescriptor.setOwner(((HPCRemoteCluster) remoteCluster).getServerInfo().getUserName());
-
- ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling();
- if (taskScheduling != null) {
- int totalNodeCount = taskScheduling.getNodeCount();
- int totalCPUCount = taskScheduling.getTotalCPUCount();
-
-// jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
- if (taskScheduling.getComputationalProjectAccount() != null) {
- jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
- }
- if (taskScheduling.getQueueName() != null) {
- jobDescriptor.setQueueName(taskScheduling.getQueueName());
- }
-
- if (totalNodeCount > 0) {
- jobDescriptor.setNodes(totalNodeCount);
- }
- if (taskScheduling.getComputationalProjectAccount() != null) {
- jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
- }
- if (taskScheduling.getQueueName() != null) {
- jobDescriptor.setQueueName(taskScheduling.getQueueName());
- }
- if (totalCPUCount > 0) {
- int ppn = totalCPUCount / totalNodeCount;
- jobDescriptor.setProcessesPerNode(ppn);
- jobDescriptor.setCPUCount(totalCPUCount);
- }
- if (taskScheduling.getWallTimeLimit() > 0) {
- jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit()));
- if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
- jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit()));
- }
- }
-
- if (taskScheduling.getTotalPhysicalMemory() > 0) {
- jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + "");
- }
- } else {
- logger.error("Task scheduling cannot be null at this point..");
- }
-
- ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
- List<String> moduleCmds = appDepDescription.getModuleLoadCmds();
- if (moduleCmds != null) {
- for (String moduleCmd : moduleCmds) {
- jobDescriptor.addModuleLoadCommands(moduleCmd);
- }
- }
- List<String> preJobCommands = appDepDescription.getPreJobCommands();
- if (preJobCommands != null) {
- for (String preJobCommand : preJobCommands) {
- jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext));
- }
- }
-
- List<String> postJobCommands = appDepDescription.getPostJobCommands();
- if (postJobCommands != null) {
- for (String postJobCommand : postJobCommands) {
- jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext));
- }
- }
-
- ApplicationParallelismType parallelism = appDepDescription.getParallelism();
- if (parallelism != null){
- if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
- Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
- if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
- for (JobManagerCommand command : jobManagerCommands.keySet()) {
- if (command == JobManagerCommand.SUBMISSION) {
- String commandVal = jobManagerCommands.get(command);
- jobDescriptor.setJobSubmitter(commandVal);
- }
- }
- }
- }
- }
- return jobDescriptor;
- }
-
- private static String parseCommand(String value, JobExecutionContext jobExecutionContext) {
- String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir());
- parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir());
- parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir());
- return parsedValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index 5ab2e96..a0d3a9b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -21,13 +21,27 @@
package org.apache.airavata.gfac.impl;
import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.monitor.JobMonitor;
+import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
+import org.apache.airavata.gfac.impl.job.LSFOutputParser;
+import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
+import org.apache.airavata.gfac.impl.job.PBSOutputParser;
+import org.apache.airavata.gfac.impl.job.SlurmJobConfiguration;
+import org.apache.airavata.gfac.impl.job.SlurmOutputParser;
+import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
+import org.apache.airavata.gfac.impl.job.UGEOutputParser;
+import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
@@ -38,6 +52,10 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
import java.util.Map;
public abstract class Factory {
@@ -46,6 +64,8 @@ public abstract class Factory {
private static Map<String, RemoteCluster> remoteClusterMap;
private static LocalEventPublisher localEventPublisher;
private static CuratorFramework curatorClient;
+ private static EmailBasedMonitor emailBasedMonitor;
+ private static Date startMonitorDate = Calendar.getInstance().getTime();
public static GFacEngine getGFacEngine() throws GFacException {
if (engine == null) {
@@ -93,4 +113,37 @@ public abstract class Factory {
}
return curatorClient;
}
+
+ public static JobMonitor getJobMonitor(ResourceJobManagerType resourceJobManagerType) throws AiravataException {
+ if (resourceJobManagerType == ResourceJobManagerType.FORK) {
+ return null; // TODO write a job monitor for this.
+ } else {
+ if (emailBasedMonitor == null) {
+ synchronized (EmailMonitorFactory.class){
+ if (emailBasedMonitor == null) {
+ emailBasedMonitor = new EmailBasedMonitor(resourceJobManagerType);
+ emailBasedMonitor.setDate(startMonitorDate);
+ new Thread(emailBasedMonitor).start();
+ }
+ }
+ }
+ return emailBasedMonitor;
+ }
+ }
+
+ public static JobManagerConfiguration getPBSJobManager(String installedPath) {
+ return new PBSJobConfiguration("PBSTemplate.xslt",".pbs", installedPath, new PBSOutputParser());
+ }
+
+ public static JobManagerConfiguration getSLURMJobManager(String installedPath) {
+ return new SlurmJobConfiguration("SLURMTemplate.xslt", ".slurm", installedPath, new SlurmOutputParser());
+ }
+
+ public static JobManagerConfiguration getUGEJobManager(String installedPath) {
+ return new UGEJobConfiguration("UGETemplate.xslt", ".pbs", installedPath, new UGEOutputParser());
+ }
+
+ public static JobManagerConfiguration getLSFJobManager(String installedPath) {
+ return new LSFJobConfiguration("LSFTemplate.xslt", ".lsf", installedPath, new LSFOutputParser());
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
new file mode 100644
index 0000000..645cb30
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.UserInfo;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+import org.apache.airavata.gfac.core.cluster.OutputParser;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.model.status.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * One Remote cluster instance for each compute resource.
+ */
+public class HPCRemoteCluster implements RemoteCluster{
+ private static final Logger log = LoggerFactory.getLogger(HPCRemoteCluster.class);
+ private final SSHKeyAuthentication authentication;
+ private final ServerInfo serverInfo;
+ private final JobManagerConfiguration jobManagerConfiguration;
+ private final JSch jSch;
+ private Session session;
+ private OutputParser outputParser;
+
+ public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo
+ authenticationInfo, OutputParser outputParser) throws AiravataException {
+ try {
+ this.serverInfo = serverInfo;
+ this.jobManagerConfiguration = jobManagerConfiguration;
+ if (authenticationInfo instanceof SSHKeyAuthentication) {
+ authentication = (SSHKeyAuthentication) authenticationInfo;
+ } else {
+ throw new AiravataException("Support ssh key authentication only");
+ }
+ this.outputParser = outputParser;
+ jSch = new JSch();
+ jSch.addIdentity(authentication.getPrivateKeyFilePath(), authentication.getPublicKeyFilePath(), authentication
+ .getPassphrase().getBytes());
+ session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(), serverInfo.getPort());
+ session.setUserInfo(new DefaultUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase()));
+ session.connect(); // 0 connection timeout
+ } catch (JSchException e) {
+ throw new AiravataException("JSch initialization error ", e);
+ }
+ }
+
+ @Override
+ public String submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
+ scpTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
+ RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
+
+ StandardOutReader reader = new StandardOutReader();
+ executeCommand(submitCommand, reader);
+ throwExceptionOnError(reader, submitCommand);
+ return outputParser.parseJobSubmission(reader.getStdOutputString());
+ }
+
+ @Override
+ public void scpTo(String localFile, String remoteFile) throws SSHApiException {
+ int retry = 3;
+ while (retry > 0) {
+ try {
+ if (!session.isConnected()) {
+ session.connect();
+ }
+ log.info("Transferring localhost:" + localFile + " to " + serverInfo.getHost() + ":" + remoteFile);
+ SSHUtils.scpTo(localFile, remoteFile, session);
+ retry = 0;
+ } catch (Exception e) {
+ retry--;
+ if (!session.isConnected()) {
+ try {
+ session.connect();
+ } catch (JSchException e1) {
+ throw new SSHApiException("JSch Session connection failed");
+ }
+ }
+ if (retry == 0) {
+ throw new SSHApiException("Failed to scp localhost:" + localFile + " to " + serverInfo.getHost() +
+ ":" + remoteFile, e);
+ } else {
+ log.info("Retry transfer localhost:" + localFile + " to " + serverInfo.getHost() + ":" +
+ remoteFile);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void scpFrom(String remoteFile, String localFile) throws SSHApiException {
+ int retry = 3;
+ while(retry>0) {
+ try {
+ if (!session.isConnected()) {
+ session.connect();
+ }
+ log.info("Transferring " + serverInfo.getHost() + ":" + remoteFile + " To localhost:" + localFile);
+ SSHUtils.scpFrom(remoteFile, localFile, session);
+ retry=0;
+ } catch (Exception e) {
+ retry--;
+ if (!session.isConnected()) {
+ try {
+ session.connect();
+ } catch (JSchException e1) {
+ throw new SSHApiException("JSch Session connection failed");
+ }
+ }
+ if (retry == 0) {
+ throw new SSHApiException("Failed to scp " + serverInfo.getHost() + ":" + remoteFile + " to " +
+ "localhost:" + localFile, e);
+ } else {
+ log.info("Retry transfer " + serverInfo.getHost() + ":" + remoteFile + " to localhost:" + localFile);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void scpThirdParty(String remoteFileSource, String remoteFileTarget) throws SSHApiException {
+ try {
+ if(!session.isConnected()){
+ session.connect();
+ }
+ log.info("Transferring from:" + remoteFileSource + " To: " + remoteFileTarget);
+ SSHUtils.scpThirdParty(remoteFileSource, remoteFileTarget, session);
+ } catch (IOException | JSchException e) {
+ throw new SSHApiException("Failed scp file:" + remoteFileSource + " to remote file "
+ +remoteFileTarget , e);
+ }
+ }
+
+ @Override
+ public void makeDirectory(String directoryPath) throws SSHApiException {
+ try {
+ if (!session.isConnected()) {
+ session.connect();
+ }
+ log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
+ SSHUtils.makeDirectory(directoryPath, session);
+ } catch (JSchException | IOException e) {
+ throw new SSHApiException("Failed to create directory " + serverInfo.getHost() + ":" + directoryPath);
+ }
+ }
+
+ @Override
+ public boolean cancelJob(String jobId) throws SSHApiException {
+ RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobId);
+ StandardOutReader reader = new StandardOutReader();
+ executeCommand(cancelCommand, reader);
+ throwExceptionOnError(reader, cancelCommand);
+ return true;
+ }
+
+ @Override
+ public JobStatus getJobStatus(String jobId) throws SSHApiException {
+ RawCommandInfo monitorCommand = jobManagerConfiguration.getMonitorCommand(jobId);
+ StandardOutReader reader = new StandardOutReader();
+ executeCommand(monitorCommand, reader);
+ throwExceptionOnError(reader, monitorCommand);
+ return outputParser.parseJobStatus(jobId, reader.getStdOutputString());
+ }
+
+ @Override
+ public String getJobIdByJobName(String jobName, String userName) throws SSHApiException {
+ RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
+ StandardOutReader reader = new StandardOutReader();
+ executeCommand(jobIdMonitorCommand, reader);
+ throwExceptionOnError(reader, jobIdMonitorCommand);
+ return outputParser.parseJobId(jobName, reader.getStdOutputString());
+ }
+
+ @Override
+ public void getJobStatuses(String userName, Map<String, JobStatus> jobStatusMap) throws SSHApiException {
+ RawCommandInfo userBasedMonitorCommand = jobManagerConfiguration.getUserBasedMonitorCommand(userName);
+ StandardOutReader reader = new StandardOutReader();
+ executeCommand(userBasedMonitorCommand, reader);
+ throwExceptionOnError(reader, userBasedMonitorCommand);
+ outputParser.parseJobStatuses(userName, jobStatusMap, reader.getStdOutputString());
+ }
+
+ @Override
+ public List<String> listDirectory(String directoryPath) throws SSHApiException {
+ try {
+ if (!session.isConnected()) {
+ session.connect();
+ }
+ log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
+ return SSHUtils.listDirectory(directoryPath, session);
+ } catch (JSchException | IOException e) {
+ throw new SSHApiException("Failed to list directory " + serverInfo.getHost() + ":" + directoryPath);
+ }
+ }
+
+ @Override
+ public Session getSession() throws SSHApiException {
+ return session;
+ }
+
+ @Override
+ public void disconnect() throws SSHApiException {
+ session.disconnect();
+ }
+
+ /**
+ * This method return <code>true</code> if there is an error in standard output. If not return <code>false</code>
+ * @param reader - command output reader
+ * @param submitCommand - command which executed in remote machine.
+ * @return command has return error or not.
+ */
+ private void throwExceptionOnError(StandardOutReader reader, RawCommandInfo submitCommand) throws SSHApiException{
+ String stdErrorString = reader.getStdErrorString();
+ String command = submitCommand.getCommand().substring(submitCommand.getCommand().lastIndexOf(File.separator)
+ + 1);
+ if (stdErrorString == null) {
+ // noting to do
+ }else if ((stdErrorString.contains(command.trim()) && !stdErrorString.contains("Warning")) || stdErrorString
+ .contains("error")) {
+ log.error("Command {} , Standard Error output {}", command, stdErrorString);
+ throw new SSHApiException("Error running command " + command + " on remote cluster. StandardError: " +
+ stdErrorString);
+ }
+ }
+
+ private void executeCommand(CommandInfo commandInfo, CommandOutput commandOutput) throws SSHApiException {
+ String command = commandInfo.getCommand();
+ ChannelExec channelExec = null;
+ try {
+ if (!session.isConnected()) {
+ session.connect();
+ }
+ channelExec = ((ChannelExec) session.openChannel("exec"));
+ channelExec.setCommand(command);
+ channelExec.setInputStream(null);
+ channelExec.setErrStream(commandOutput.getStandardError());
+ log.info("Executing command {}", commandInfo.getCommand());
+ channelExec.connect();
+ commandOutput.onOutput(channelExec);
+ } catch (JSchException e) {
+ throw new SSHApiException("Unable to execute command - ", e);
+ }finally {
+ //Only disconnecting the channel, session can be reused
+ if (channelExec != null) {
+ channelExec.disconnect();
+ }
+ }
+ }
+
+ @Override
+ public ServerInfo getServerInfo() {
+ return this.serverInfo;
+ }
+
+ private class DefaultUserInfo implements UserInfo {
+
+ private String userName;
+ private String password;
+ private String passphrase;
+
+ public DefaultUserInfo(String userName, String password, String passphrase) {
+ this.userName = userName;
+ this.password = password;
+ this.passphrase = passphrase;
+ }
+
+ @Override
+ public String getPassphrase() {
+ return null;
+ }
+
+ @Override
+ public String getPassword() {
+ return null;
+ }
+
+ @Override
+ public boolean promptPassword(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean promptPassphrase(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean promptYesNo(String s) {
+ return false;
+ }
+
+ @Override
+ public void showMessage(String s) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index eef6cf3..6a100a9 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -25,7 +25,6 @@ import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.gsi.ssh.impl.StandardOutReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java
new file mode 100644
index 0000000..e34858b
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Channel;
+
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class StandardOutReader implements CommandOutput {
+
+ private static final Logger logger = LoggerFactory.getLogger(StandardOutReader.class);
+ String stdOutputString = null;
+ ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
+ public void onOutput(Channel channel) {
+ try {
+ StringBuffer pbsOutput = new StringBuffer("");
+ InputStream inputStream = channel.getInputStream();
+ byte[] tmp = new byte[1024];
+ do {
+ while (inputStream.available() > 0) {
+ int i = inputStream.read(tmp, 0, 1024);
+ if (i < 0) break;
+ pbsOutput.append(new String(tmp, 0, i));
+ }
+ } while (!channel.isClosed()) ;
+ String output = pbsOutput.toString();
+ this.setStdOutputString(output);
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ }
+
+
+ public void exitCode(int code) {
+ System.out.println("Program exit code - " + code);
+ }
+
+ public String getStdOutputString() {
+ return stdOutputString;
+ }
+
+ public void setStdOutputString(String stdOutputString) {
+ this.stdOutputString = stdOutputString;
+ }
+
+ public String getStdErrorString() {
+ return errorStream.toString();
+ }
+
+ public OutputStream getStandardError() {
+ return errorStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFJobConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFJobConfiguration.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFJobConfiguration.java
new file mode 100644
index 0000000..b84b8ff
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFJobConfiguration.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl.job;
+
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.gfac.core.cluster.OutputParser;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.commons.io.FilenameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class LSFJobConfiguration implements JobManagerConfiguration {
+ private final static Logger logger = LoggerFactory.getLogger(LSFJobConfiguration.class);
+
+ private String jobDescriptionTemplateName;
+
+ private String scriptExtension;
+
+ private String installedPath;
+
+ private OutputParser parser;
+
+ public LSFJobConfiguration(){
+ // this can be used to construct and use setter methods to set all the params in order
+ }
+ public LSFJobConfiguration(String jobDescriptionTemplateName,
+ String scriptExtension,String installedPath,OutputParser parser) {
+ this.jobDescriptionTemplateName = jobDescriptionTemplateName;
+ this.scriptExtension = scriptExtension;
+ this.parser = parser;
+ if (installedPath.endsWith("/") || installedPath.isEmpty()) {
+ this.installedPath = installedPath;
+ } else {
+ this.installedPath = installedPath + "/";
+ }
+ }
+
+ @Override
+ public RawCommandInfo getCancelCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + "bkill " + jobID);
+ }
+
+ @Override
+ public String getJobDescriptionTemplateName() {
+ return jobDescriptionTemplateName;
+ }
+
+ @Override
+ public RawCommandInfo getMonitorCommand(String jobID) {
+ return new RawCommandInfo(this.installedPath + "bjobs " + jobID);
+ }
+
+ @Override
+ public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+ return new RawCommandInfo(this.installedPath + "bjobs -u " + userName);
+ }
+
+ @Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ return new RawCommandInfo(this.installedPath + "bjobs -J " + jobName);
+ }
+
+ @Override
+ public String getScriptExtension() {
+ return scriptExtension;
+ }
+
+ @Override
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+ return new RawCommandInfo(this.installedPath + "bsub < " +
+ workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
+ }
+
+ @Override
+ public OutputParser getParser() {
+ return parser;
+ }
+
+ public void setParser(OutputParser parser) {
+ this.parser = parser;
+ }
+
+ @Override
+ public String getInstalledPath() {
+ return installedPath;
+ }
+
+
+ @Override
+ public String getBaseCancelCommand() {
+ return "bkill";
+ }
+
+ @Override
+ public String getBaseMonitorCommand() {
+ return "bjobs";
+ }
+
+ @Override
+ public String getBaseSubmitCommand() {
+ return "bsub";
+ }
+}