You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/05 17:07:14 UTC
[1/3] fixing more packaing issue
Repository: airavata
Updated Branches:
refs/heads/master 49b6987f3 -> 6209ee096
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
new file mode 100644
index 0000000..b0fd9eb
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -0,0 +1,310 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.ssh.provider.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.CommandExecutor;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
+import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * Execute application using remote SSH
+ */
+public class SSHProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
+ private Cluster cluster;
+ private String jobID = null;
+ private String taskID = null;
+ // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
+ private boolean hpcType = false;
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ super.initialize(jobExecutionContext);
+ if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ taskID = jobExecutionContext.getTaskData().getTaskID();
+ if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+ jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ details.setJobID(taskID);
+ details.setJobDescription(remoteFile);
+ jobExecutionContext.setJobDetails(details);
+ JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null);
+ details.setJobDescription(jobDescriptor.toXML());
+
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
+ log.info(remoteFile);
+ try {
+ File runscript = createShellScript(jobExecutionContext);
+ cluster.scpTo(remoteFile, runscript.getAbsolutePath());
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+ }else{
+ hpcType = true;
+ }
+ }
+
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ if (!hpcType) {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ try {
+ /*
+ * Execute
+ */
+ String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ details.setJobDescription(execuable);
+
+// GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
+ RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
+
+ StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
+
+ CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
+ String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
+
+ log.info("stdout=" + stdOutputString);
+
+// GFacUtils.updateJobStatus(details, JobState.COMPLETE);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ } finally {
+ if (cluster != null) {
+ try {
+ cluster.disconnect();
+ } catch (SSHApiException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+ }
+ } else {
+ try {
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ HostDescriptionType host = jobExecutionContext.getApplicationContext().
+ getHostDescription().getType();
+ HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
+ getApplicationDeploymentDescription().getType();
+ JobDetails jobDetails = new JobDetails();
+ String taskID = jobExecutionContext.getTaskData().getTaskID();
+ try {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) != null) {
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ // This installed path is a mandetory field, because this could change based on the computing resource
+ JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+
+ log.info(jobDescriptor.toXML());
+
+ jobDetails.setJobDescription(jobDescriptor.toXML());
+
+ String jobID = cluster.submitBatchJob(jobDescriptor);
+ jobExecutionContext.setJobDetails(jobDetails);
+ if (jobID == null) {
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ } else {
+ jobDetails.setJobID(jobID);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ }
+
+ } catch (SSHApiException e) {
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ } catch (Exception e) {
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ }
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+
+ }
+
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ throw new NotImplementedException();
+ }
+
+
+ private File createShellScript(JobExecutionContext context) throws IOException {
+ ApplicationDeploymentDescriptionType app = context.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
+ + new Random().nextLong();
+
+ File shellScript = File.createTempFile(uniqueDir, "sh");
+ OutputStream out = new FileOutputStream(shellScript);
+
+ out.write("#!/bin/bash\n".getBytes());
+ out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
+ .getBytes());
+ // get the env of the host and the application
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+ Map<String, String> nv = new HashMap<String, String>();
+ if (env != null) {
+ for (int i = 0; i < env.length; i++) {
+ String key = env[i].getName();
+ String value = env[i].getValue();
+ nv.put(key, value);
+ }
+ }
+ for (Entry<String, String> entry : nv.entrySet()) {
+ log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
+ out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+
+ }
+
+ // prepare the command
+ final String SPACE = " ";
+ StringBuffer cmd = new StringBuffer();
+ cmd.append(app.getExecutableLocation());
+ cmd.append(SPACE);
+
+ MessageContext input = context.getInMessageContext();
+ ;
+ Map<String, Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
+ for (String value : values) {
+ cmd.append(value);
+ cmd.append(SPACE);
+ }
+ } else {
+ String paramValue = MappingFactory.toString(actualParameter);
+ cmd.append(paramValue);
+ cmd.append(SPACE);
+ }
+ }
+ // We redirect the error and stdout to remote files, they will be read
+ // in later
+ cmd.append(SPACE);
+ cmd.append("1>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardOutput());
+ cmd.append(SPACE);
+ cmd.append("2>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardError());
+
+ String cmdStr = cmd.toString();
+ log.info("Command = " + cmdStr);
+ out.write((cmdStr + "\n").getBytes());
+ String message = "\"execuationSuceeded\"";
+ out.write(("echo " + message + "\n").getBytes());
+ out.close();
+
+ return shellScript;
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+ /**
+ * This method will read standard output and if there's any it will be parsed
+ * @param jobIDReaderCommandOutput
+ * @param errorMsg
+ * @return
+ * @throws SSHApiException
+ */
+ private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
+ String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
+ String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
+
+ if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
+ log.error("Standard Error output : " + stdErrorString);
+ throw new SSHApiException(errorMsg + stdErrorString);
+ }
+ return stdOutputString;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/integration-tests/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/integration-tests/src/test/resources/gfac-config.xml b/modules/integration-tests/src/test/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/integration-tests/src/test/resources/gfac-config.xml
+++ b/modules/integration-tests/src/test/resources/gfac-config.xml
@@ -24,7 +24,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
@@ -24,7 +24,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml b/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
+++ b/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
@@ -24,7 +24,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>
[2/3] fixing more packaing issue
Posted by la...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
new file mode 100644
index 0000000..03c3fee
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
@@ -0,0 +1,527 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gram.provider.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.JobSubmissionFault;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.gram.security.GSISecurityContext;
+import org.apache.airavata.gfac.gram.util.GramJobSubmissionListener;
+import org.apache.airavata.gfac.gram.util.GramProviderUtils;
+import org.apache.airavata.gfac.notification.events.JobIDEvent;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.globus.gram.GramException;
+import org.globus.gram.GramJob;
+import org.globus.gram.WaitingForCommitException;
+import org.globus.gram.internal.GRAMConstants;
+import org.globus.gram.internal.GRAMProtocolErrorConstants;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GramProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
+
+ private GramJob job;
+ private GramJobSubmissionListener listener;
+ private boolean twoPhase = true;
+
+ /**
+ * If normal job submission fail due to an authorisation failure or script failure we
+ * will re-attempt to submit the job. In-order to avoid any recursive loop during a continuous
+ * failure we track whether failure paths are tried or not. Following booleans keeps track whether
+ * we already tried a failure path or not.
+ */
+ /**
+ * To track job submissions during a authorisation failure while requesting job.
+ */
+ private boolean renewCredentialsAttempt = false;
+ /**
+ * To track job submission during a script error situation.
+ */
+ private boolean reSubmissionInProgress = false;
+ /**
+ * To track authorisation failures during status monitoring.
+ */
+ private boolean authorisationFailedAttempt = false;
+
+ private static final Map<String, GramJob> currentlyExecutingJobCache
+ = new ConcurrentHashMap<String, GramJob>();
+
+ private static Properties resources;
+
+ static {
+ try {
+
+ String propFileName = "errors.properties";
+ resources = new Properties();
+ InputStream inputStream = GramProvider.class.getClassLoader()
+ .getResourceAsStream(propFileName);
+
+ if (inputStream == null) {
+ throw new FileNotFoundException("property file '" + propFileName
+ + "' not found in the classpath");
+ }
+
+ resources.load(inputStream);
+
+ } catch (FileNotFoundException mre) {
+ log.error("errors.properties not found", mre);
+ } catch (IOException e) {
+ log.error("Error reading errors.properties file", e);
+ }
+ }
+
+
+ // This method prepare the environment before the application invocation.
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+
+ try {
+ super.initialize(jobExecutionContext);
+ String strTwoPhase = ServerSettings.getSetting("TwoPhase");
+ if (strTwoPhase != null) {
+ twoPhase = Boolean.parseBoolean(strTwoPhase);
+ log.info("Two phase commit is set to " + twoPhase);
+ }
+ } catch (ApplicationSettingsException e) {
+ log.warn("Error reading TwoPhase property from configurations.", e);
+ }
+
+ job = GramProviderUtils.setupEnvironment(jobExecutionContext, twoPhase);
+ listener = new GramJobSubmissionListener(job, jobExecutionContext);
+ job.addListener(listener);
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException{
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().
+ getHostDescription().getType();
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
+ getApplicationDeploymentDescription().getType();
+
+ StringBuilder stringBuilder = new StringBuilder();
+ try {
+
+ GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
+ getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+ job.setCredentials(gssCred);
+ // We do not support multiple gatekeepers in XBaya GUI, so we simply pick the 0th element in the array
+ String gateKeeper = host.getGlobusGateKeeperEndPointArray(0);
+ log.info("Request to contact:" + gateKeeper);
+
+ stringBuilder.append("Finished launching job, Host = ").append(host.getHostAddress()).append(" RSL = ")
+ .append(job.getRSL()).append(" working directory = ").append(app.getStaticWorkingDirectory())
+ .append(" temp directory = ").append(app.getScratchWorkingDirectory())
+ .append(" Globus GateKeeper Endpoint = ").append(gateKeeper);
+
+ log.info(stringBuilder.toString());
+
+ submitJobs(gateKeeper, jobExecutionContext, host);
+
+ } catch (ApplicationSettingsException e) {
+ throw new GFacException(e.getMessage(), e);
+ } finally {
+ if (job != null) {
+ try {
+ /*
+ * Remove listener
+ */
+ job.removeListener(listener);
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ }
+ }
+ }
+
+ private void submitJobs(String gateKeeper,
+ JobExecutionContext jobExecutionContext,
+ GlobusHostType globusHostType) throws GFacException, GFacProviderException {
+ boolean applicationSaved=false;
+ String taskID = jobExecutionContext.getTaskData().getTaskID();
+
+ if (twoPhase) {
+ try {
+ /*
+ * The first boolean is to force communication through SSLv3
+ * The second boolean is to specify the job is a batch job - use true for interactive and false for
+ * batch.
+ * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
+ */
+ job.request(true, gateKeeper, false, false);
+
+ // Single boolean to track all authentication failures, therefore we need to re-initialize
+ // this here
+ renewCredentialsAttempt = false;
+
+ } catch (WaitingForCommitException e) {
+ String jobID = job.getIDAsString();
+
+ details.setJobID(jobID);
+ details.setJobDescription(job.getRSL());
+ jobExecutionContext.setJobDetails(details);
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.UN_SUBMITTED);
+
+ applicationSaved=true;
+ String jobStatusMessage = "Un-submitted JobID= " + jobID;
+ log.info(jobStatusMessage);
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+ log.info("Two phase commit: sending COMMIT_REQUEST signal; Job id - " + jobID);
+
+ try {
+ job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
+
+ } catch (GramException gramException) {
+ throw new GFacException("Error while sending commit request. Job Id - "
+ + job.getIDAsString(), gramException);
+ } catch (GSSException gssException) {
+
+ // User credentials are invalid
+ log.error("Error while submitting commit request - Credentials provided are invalid. Job Id - "
+ + job.getIDAsString(), e);
+ log.info("Attempting to renew credentials and re-submit commit signal...");
+ GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ renewCredentials(jobExecutionContext);
+
+ try {
+ job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
+ } catch (GramException e1) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacException("Error while sending commit request. Job Id - "
+ + job.getIDAsString(), e1);
+ } catch (GSSException e1) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacException("Error while sending commit request. Job Id - "
+ + job.getIDAsString() + ". Credentials provided invalid", e1);
+ }
+ }
+ GFacUtils.updateJobStatus(jobExecutionContext, details, JobState.SUBMITTED);
+ jobStatusMessage = "Submitted JobID= " + job.getIDAsString();
+ log.info(jobStatusMessage);
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+ } catch (GSSException e) {
+ // Renew credentials and re-submit
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+
+ reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
+
+ } catch (GramException e) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+
+ throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
+ }
+ } else {
+
+ /*
+ * The first boolean is to force communication through SSLv3
+ * The second boolean is to specify the job is a batch job - use true for interactive and false for
+ * batch.
+ * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
+ */
+ try {
+
+ job.request(true, gateKeeper, false, false);
+ renewCredentialsAttempt = false;
+
+ } catch (GramException e) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
+ } catch (GSSException e) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ // Renew credentials and re-submit
+ reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
+ }
+
+ String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
+ log.info(jobStatusMessage);
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+ }
+
+ currentlyExecutingJobCache.put(job.getIDAsString(), job);
+ /*
+ * Wait until job is done
+ */
+ listener.waitFor();
+
+ checkJobStatus(jobExecutionContext, globusHostType, gateKeeper);
+
+ }
+
+ private void renewCredentials(JobExecutionContext jobExecutionContext) throws GFacException {
+
+ renewCredentials(this.job, jobExecutionContext);
+ }
+
+ private void renewCredentials(GramJob gramJob, JobExecutionContext jobExecutionContext) throws GFacException {
+
+ try {
+ GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
+ getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).renewCredentials();
+ gramJob.renew(gssCred);
+ } catch (GramException e1) {
+ throw new GFacException("Unable to renew credentials. Job Id - "
+ + gramJob.getIDAsString(), e1);
+ } catch (GSSException e1) {
+ throw new GFacException("Unable to renew credentials. Job Id - "
+ + gramJob.getIDAsString(), e1);
+ } catch (ApplicationSettingsException e) {
+ throw new GFacException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ private void reSubmitJob(String gateKeeper,
+ JobExecutionContext jobExecutionContext,
+ GlobusHostType globusHostType, Exception e) throws GFacException, GFacProviderException {
+
+ if (!renewCredentialsAttempt) {
+
+ renewCredentialsAttempt = true;
+
+ // User credentials are invalid
+ log.error("Error while submitting job - Credentials provided are invalid. Job Id - "
+ + job.getIDAsString(), e);
+ log.info("Attempting to renew credentials and re-submit jobs...");
+
+ // Remove existing listener and register a new listener
+ job.removeListener(listener);
+ listener = new GramJobSubmissionListener(job, jobExecutionContext);
+
+ job.addListener(listener);
+
+ renewCredentials(jobExecutionContext);
+
+ submitJobs(gateKeeper, jobExecutionContext, globusHostType);
+
+ } else {
+ throw new GFacException("Error while submitting job - Credentials provided are invalid. Job Id - "
+ + job.getIDAsString(), e);
+ }
+
+ }
+
+ private void reSubmitJob(String gateKeeper,
+ JobExecutionContext jobExecutionContext,
+ GlobusHostType globusHostType) throws GFacException, GFacProviderException {
+
+ // User credentials are invalid
+ log.info("Attempting to renew credentials and re-submit jobs...");
+
+ // Remove existing listener and register a new listener
+ job.removeListener(listener);
+ listener = new GramJobSubmissionListener(job, jobExecutionContext);
+
+ job.addListener(listener);
+
+ renewCredentials(jobExecutionContext);
+
+ submitJobs(gateKeeper, jobExecutionContext, globusHostType);
+
+ }
+
+
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ }
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ cancelSingleJob(jobId, jobExecutionContext);
+ }
+
+
+ private void cancelSingleJob(String jobId, JobExecutionContext context) throws GFacException {
+ // First check whether job id is in the cache
+ if (currentlyExecutingJobCache.containsKey(jobId)) {
+
+ synchronized (this) {
+ GramJob gramJob = currentlyExecutingJobCache.get(jobId);
+
+ // Even though we check using containsKey, at this point job could be null
+ if (gramJob != null && (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
+ gramJob.getStatus() != GRAMConstants.STATUS_FAILED)) {
+ cancelJob(gramJob, context);
+ }
+ }
+
+ } else {
+
+ try {
+ GSSCredential gssCred = ((GSISecurityContext)context.
+ getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+
+ GramJob gramJob = new GramJob(null);
+ try {
+ gramJob.setID(jobId);
+ } catch (MalformedURLException e) {
+ throw new GFacException("Invalid job id - " + jobId, e);
+ }
+ gramJob.setCredentials(gssCred);
+
+ synchronized (this) {
+ if (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
+ gramJob.getStatus() != GRAMConstants.STATUS_FAILED) {
+ cancelJob(gramJob, context);
+ }
+ }
+ } catch (ApplicationSettingsException e) {
+ throw new GFacException(e);
+ }
+ }
+ }
+
+ private void cancelJob(GramJob gramJob, JobExecutionContext context) throws GFacException{
+
+ try {
+ gramJob.cancel();
+ } catch (GramException e) {
+ throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e);
+ } catch (GSSException e) {
+
+ log.warn("Credentials invalid to cancel job. Attempting to renew credentials and re-try. " +
+ "Job id - " + gramJob.getIDAsString());
+ renewCredentials(gramJob, context);
+
+ try {
+ gramJob.cancel();
+ gramJob.signal(GramJob.SIGNAL_COMMIT_END);
+ } catch (GramException e1) {
+ throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e1);
+ } catch (GSSException e1) {
+ throw new GFacException("Error cancelling job, invalid credentials. Job id - "
+ + gramJob.getIDAsString(), e);
+ }
+ }
+
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacException {
+
+ }
+
+ private void checkJobStatus(JobExecutionContext jobExecutionContext, GlobusHostType host, String gateKeeper)
+ throws GFacProviderException {
+ int jobStatus = listener.getCurrentStatus();
+
+ if (jobStatus == GramJob.STATUS_FAILED) {
+
+ String errorMsg = "Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " Job Exit Code = "
+ + listener.getError() + " Error Description = " + getGramErrorString(listener.getError());
+
+ if (listener.getError() == GRAMProtocolErrorConstants.INVALID_SCRIPT_REPLY) {
+
+ // re-submitting without renewing
+ // TODO verify why we re-submit jobs when we get a invalid script reply
+ if (!reSubmissionInProgress) {
+ reSubmissionInProgress = true;
+
+ log.info("Invalid script reply received. Re-submitting job, id - " + job.getIDAsString());
+ try {
+ reSubmitJob(gateKeeper, jobExecutionContext, host);
+ } catch (GFacException e) {
+ throw new GFacProviderException
+ ("Error during re-submission. Original job submission data - " + errorMsg, e);
+ }
+ return;
+ }
+
+ } else if (listener.getError() == GRAMProtocolErrorConstants.ERROR_AUTHORIZATION) {
+
+ // re-submit with renewed credentials
+ if (!authorisationFailedAttempt) {
+ authorisationFailedAttempt = true;
+ log.info("Authorisation error contacting provider. Re-submitting job with renewed credentials.");
+
+ try {
+ renewCredentials(jobExecutionContext);
+ reSubmitJob(gateKeeper, jobExecutionContext, host);
+ } catch (GFacException e) {
+ throw new GFacProviderException
+ ("Error during re-submission. Original job submission data - " + errorMsg, e);
+ }
+
+ return;
+ }
+
+ } else if (listener.getError() == GRAMProtocolErrorConstants.USER_CANCELLED) {
+
+ log.info("User successfully cancelled job id " + job.getIDAsString());
+ return;
+ }
+
+
+
+ log.error(errorMsg);
+
+ synchronized (this) {
+ currentlyExecutingJobCache.remove(job.getIDAsString());
+ }
+
+ throw new JobSubmissionFault(new Exception(errorMsg), host.getHostAddress(), gateKeeper,
+ job.getRSL(), jobExecutionContext, getGramErrorString(listener.getError()),
+ listener.getError());
+
+ } else if (jobStatus == GramJob.STATUS_DONE) {
+ log.info("Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " is successfully executed.");
+
+ synchronized (this) {
+ currentlyExecutingJobCache.remove(job.getIDAsString());
+ }
+ }
+ }
+
+ public String getGramErrorString(int errorCode) {
+
+ if (resources != null) {
+ try {
+ return resources.getProperty(String.valueOf(errorCode));
+ } catch (MissingResourceException mre) {
+ log.warn("Error reading globus error descriptions.", mre);
+ return "Error code: " + errorCode;
+ }
+ } else {
+ return "Error code: " + errorCode;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
deleted file mode 100644
index 5ba5ebf..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.impl;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.cpi.GFacImpl;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.handler.ThreadedHandler;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
-import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class GSISSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- log.info("Invoking GSISSH Provider Invoke ...");
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
- HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
- JobDetails jobDetails = new JobDetails();
- String taskID = jobExecutionContext.getTaskData().getTaskID();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
-
- log.info(jobDescriptor.toXML());
-
- jobDetails.setJobDescription(jobDescriptor.toXML());
-
- String jobID = cluster.submitBatchJob(jobDescriptor);
- jobExecutionContext.setJobDetails(jobDetails);
- if(jobID == null){
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- }else{
- jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- }
-
-
- // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
- // to perform monitoring, daemon handlers can be accessed from anywhere
- List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
- ThreadedHandler pullMonitorHandler = null;
- for(ThreadedHandler threadedHandler:daemonHandlers){
- if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
- pullMonitorHandler = threadedHandler;
- }
- }
- // we know this hos is type GsiSSHHostType
- String monitorMode = ((GsisshHostType) host).getMonitorMode();
- if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
- pullMonitorHandler.invoke(jobExecutionContext);
- }else{
- log.error("Currently we only support Pull monitoring");
- }
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- }
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
new file mode 100644
index 0000000..abca9d9
--- /dev/null
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.provider.impl;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
+import org.apache.airavata.gfac.handler.ThreadedHandler;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class GSISSHProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ super.initialize(jobExecutionContext);
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ log.info("Invoking GSISSH Provider Invoke ...");
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ HostDescriptionType host = jobExecutionContext.getApplicationContext().
+ getHostDescription().getType();
+ HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
+ getApplicationDeploymentDescription().getType();
+ JobDetails jobDetails = new JobDetails();
+ String taskID = jobExecutionContext.getTaskData().getTaskID();
+ try {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ // This installed path is a mandetory field, because this could change based on the computing resource
+ JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+
+ log.info(jobDescriptor.toXML());
+
+ jobDetails.setJobDescription(jobDescriptor.toXML());
+
+ String jobID = cluster.submitBatchJob(jobDescriptor);
+ jobExecutionContext.setJobDetails(jobDetails);
+ if(jobID == null){
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ }else{
+ jobDetails.setJobID(jobID);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ }
+
+
+ // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
+ // to perform monitoring, daemon handlers can be accessed from anywhere
+ List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
+ ThreadedHandler pullMonitorHandler = null;
+ for(ThreadedHandler threadedHandler:daemonHandlers){
+ if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
+ pullMonitorHandler = threadedHandler;
+ }
+ }
+ // we know this hos is type GsiSSHHostType
+ String monitorMode = ((GsisshHostType) host).getMonitorMode();
+ if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
+ log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
+ pullMonitorHandler.invoke(jobExecutionContext);
+ }else{
+ log.error("Currently we only support Pull monitoring");
+ }
+ } catch (SSHApiException e) {
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ } catch (Exception e) {
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ }
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
deleted file mode 100644
index f0a0bf9..0000000
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.local.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
-import org.apache.airavata.gfac.local.utils.InputUtils;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.provider.utils.ProviderUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.utils.OutputUtils;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.NameValuePairType;
-import org.apache.xmlbeans.XmlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-public class LocalProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
- private ProcessBuilder builder;
- private List<String> cmdList;
- private String jobId;
-
- public static class LocalProviderJobData{
- private String applicationName;
- private List<String> inputParameters;
- private String workingDir;
- private String inputDir;
- private String outputDir;
- public String getApplicationName() {
- return applicationName;
- }
- public void setApplicationName(String applicationName) {
- this.applicationName = applicationName;
- }
- public List<String> getInputParameters() {
- return inputParameters;
- }
- public void setInputParameters(List<String> inputParameters) {
- this.inputParameters = inputParameters;
- }
- public String getWorkingDir() {
- return workingDir;
- }
- public void setWorkingDir(String workingDir) {
- this.workingDir = workingDir;
- }
- public String getInputDir() {
- return inputDir;
- }
- public void setInputDir(String inputDir) {
- this.inputDir = inputDir;
- }
- public String getOutputDir() {
- return outputDir;
- }
- public void setOutputDir(String outputDir) {
- this.outputDir = outputDir;
- }
- }
- public LocalProvider(){
- cmdList = new ArrayList<String>();
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
- super.initialize(jobExecutionContext);
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
-
- buildCommand(app.getExecutableLocation(), ProviderUtils.getInputParameters(jobExecutionContext));
- initProcessBuilder(app);
-
- // extra environment variables
- builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, app.getInputDataDirectory());
- builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, app.getOutputDataDirectory());
-
- // set working directory
- builder.directory(new File(app.getStaticWorkingDirectory()));
-
- // log info
- log.info("Command = " + InputUtils.buildCommand(cmdList));
- log.info("Working dir = " + builder.directory());
- for (String key : builder.environment().keySet()) {
- log.info("Env[" + key + "] = " + builder.environment().get(key));
- }
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- ApplicationDeploymentDescriptionType app = jobExecutionContext.
- getApplicationContext().getApplicationDeploymentDescription().getType();
- JobDetails jobDetails = new JobDetails();
- try {
- jobId = jobExecutionContext.getTaskData().getTaskID();
- jobDetails.setJobID(jobId);
- jobDetails.setJobDescription(app.toString());
- jobExecutionContext.setJobDetails(jobDetails);
- jobDetails.setJobDescription(app.toString());
- GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP);
- // running cmd
- Process process = builder.start();
-
- Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput());
- Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError());
-
- // start output threads
- standardOutWriter.setDaemon(true);
- standardErrorWriter.setDaemon(true);
- standardOutWriter.start();
- standardErrorWriter.start();
-
- int returnValue = process.waitFor();
-
- // make sure other two threads are done
- standardOutWriter.join();
- standardErrorWriter.join();
-
- /*
- * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
- * just provide warning in the log messages
- */
- if (returnValue != 0) {
- log.error("Process finished with non zero return value. Process may have failed");
- } else {
- log.info("Process finished with return value of zero.");
- }
-
- StringBuffer buf = new StringBuffer();
- buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
- .append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory())
- .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
- .append(String.valueOf(returnValue));
- log.info(buf.toString());
- } catch (IOException io) {
- throw new GFacProviderException(io.getMessage(), io);
- } catch (InterruptedException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }catch (GFacException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
-
-// private void saveApplicationJob(JobExecutionContext jobExecutionContext)
-// throws GFacProviderException {
-// ApplicationDeploymentDescriptionType app = jobExecutionContext.
-// getApplicationContext().getApplicationDeploymentDescription().getType();
-// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
-// appJob.setJobId(jobId);
-// LocalProviderJobData data = new LocalProviderJobData();
-// data.setApplicationName(app.getExecutableLocation());
-// data.setInputDir(app.getInputDataDirectory());
-// data.setOutputDir(app.getOutputDataDirectory());
-// data.setWorkingDir(builder.directory().toString());
-// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
-// ByteArrayOutputStream stream = new ByteArrayOutputStream();
-// JAXB.marshal(data, stream);
-// appJob.setJobData(stream.toString());
-// appJob.setSubmittedTime(Calendar.getInstance().getTime());
-// appJob.setStatus(ApplicationJobStatus.SUBMITTED);
-// appJob.setStatusUpdateTime(appJob.getSubmittedTime());
-// GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
-// }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-
- try {
- String stdOutStr = GFacUtils.readFileToString(app.getStandardOutput());
- String stdErrStr = GFacUtils.readFileToString(app.getStandardError());
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- } catch (XmlException e) {
- throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
- } catch (IOException io) {
- throw new GFacProviderException(io.getMessage(), io);
- } catch (Exception e){
- throw new GFacProviderException("Error in retrieving results",e);
- }
- }
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- throw new NotImplementedException();
- }
-
-
- private void buildCommand(String executable, List<String> inputParameterList){
- cmdList.add(executable);
- cmdList.addAll(inputParameterList);
- }
-
- private void initProcessBuilder(ApplicationDeploymentDescriptionType app){
- builder = new ProcessBuilder(cmdList);
-
- NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
- if(env != null && env.length > 0){
- Map<String,String> builderEnv = builder.environment();
- for (NameValuePairType entry : env) {
- builderEnv.put(entry.getName(), entry.getValue());
- }
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
new file mode 100644
index 0000000..e4d55b8
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.provider.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.provider.utils.ProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.NameValuePairType;
+import org.apache.xmlbeans.XmlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+public class LocalProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
+ private ProcessBuilder builder;
+ private List<String> cmdList;
+ private String jobId;
+
+ public static class LocalProviderJobData{
+ private String applicationName;
+ private List<String> inputParameters;
+ private String workingDir;
+ private String inputDir;
+ private String outputDir;
+ public String getApplicationName() {
+ return applicationName;
+ }
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+ public List<String> getInputParameters() {
+ return inputParameters;
+ }
+ public void setInputParameters(List<String> inputParameters) {
+ this.inputParameters = inputParameters;
+ }
+ public String getWorkingDir() {
+ return workingDir;
+ }
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+ public String getInputDir() {
+ return inputDir;
+ }
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
+ public String getOutputDir() {
+ return outputDir;
+ }
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+ }
+ public LocalProvider(){
+ cmdList = new ArrayList<String>();
+ }
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+ super.initialize(jobExecutionContext);
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
+ getApplicationDeploymentDescription().getType();
+
+ buildCommand(app.getExecutableLocation(), ProviderUtils.getInputParameters(jobExecutionContext));
+ initProcessBuilder(app);
+
+ // extra environment variables
+ builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, app.getInputDataDirectory());
+ builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, app.getOutputDataDirectory());
+
+ // set working directory
+ builder.directory(new File(app.getStaticWorkingDirectory()));
+
+ // log info
+ log.info("Command = " + InputUtils.buildCommand(cmdList));
+ log.info("Working dir = " + builder.directory());
+ for (String key : builder.environment().keySet()) {
+ log.info("Env[" + key + "] = " + builder.environment().get(key));
+ }
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.
+ getApplicationContext().getApplicationDeploymentDescription().getType();
+ JobDetails jobDetails = new JobDetails();
+ try {
+ jobId = jobExecutionContext.getTaskData().getTaskID();
+ jobDetails.setJobID(jobId);
+ jobDetails.setJobDescription(app.toString());
+ jobExecutionContext.setJobDetails(jobDetails);
+ jobDetails.setJobDescription(app.toString());
+ GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP);
+ // running cmd
+ Process process = builder.start();
+
+ Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput());
+ Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError());
+
+ // start output threads
+ standardOutWriter.setDaemon(true);
+ standardErrorWriter.setDaemon(true);
+ standardOutWriter.start();
+ standardErrorWriter.start();
+
+ int returnValue = process.waitFor();
+
+ // make sure other two threads are done
+ standardOutWriter.join();
+ standardErrorWriter.join();
+
+ /*
+ * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+ * just provide warning in the log messages
+ */
+ if (returnValue != 0) {
+ log.error("Process finished with non zero return value. Process may have failed");
+ } else {
+ log.info("Process finished with return value of zero.");
+ }
+
+ StringBuffer buf = new StringBuffer();
+ buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+ .append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory())
+ .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
+ .append(String.valueOf(returnValue));
+ log.info(buf.toString());
+ } catch (IOException io) {
+ throw new GFacProviderException(io.getMessage(), io);
+ } catch (InterruptedException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+
+// private void saveApplicationJob(JobExecutionContext jobExecutionContext)
+// throws GFacProviderException {
+// ApplicationDeploymentDescriptionType app = jobExecutionContext.
+// getApplicationContext().getApplicationDeploymentDescription().getType();
+// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+// appJob.setJobId(jobId);
+// LocalProviderJobData data = new LocalProviderJobData();
+// data.setApplicationName(app.getExecutableLocation());
+// data.setInputDir(app.getInputDataDirectory());
+// data.setOutputDir(app.getOutputDataDirectory());
+// data.setWorkingDir(builder.directory().toString());
+// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
+// ByteArrayOutputStream stream = new ByteArrayOutputStream();
+// JAXB.marshal(data, stream);
+// appJob.setJobData(stream.toString());
+// appJob.setSubmittedTime(Calendar.getInstance().getTime());
+// appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+// appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+// GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+// }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+
+ try {
+ String stdOutStr = GFacUtils.readFileToString(app.getStandardOutput());
+ String stdErrStr = GFacUtils.readFileToString(app.getStandardError());
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ } catch (XmlException e) {
+ throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
+ } catch (IOException io) {
+ throw new GFacProviderException(io.getMessage(), io);
+ } catch (Exception e){
+ throw new GFacProviderException("Error in retrieving results",e);
+ }
+ }
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ throw new NotImplementedException();
+ }
+
+
+ private void buildCommand(String executable, List<String> inputParameterList){
+ cmdList.add(executable);
+ cmdList.addAll(inputParameterList);
+ }
+
+ private void initProcessBuilder(ApplicationDeploymentDescriptionType app){
+ builder = new ProcessBuilder(cmdList);
+
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+ if(env != null && env.length > 0){
+ Map<String,String> builderEnv = builder.environment();
+ for (NameValuePairType entry : env) {
+ builderEnv.put(entry.getName(), entry.getValue());
+ }
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index 902e424..afb101c 100644
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -27,7 +27,7 @@ import org.apache.airavata.gfac.context.ApplicationContext;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler;
-import org.apache.airavata.gfac.local.impl.LocalProvider;
+import org.apache.airavata.gfac.local.provider.impl.LocalProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl;
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/resources/gfac-config.xml b/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
index b9432da..8147853 100644
--- a/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
+++ b/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
@@ -19,7 +19,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
deleted file mode 100644
index ee3dcd2..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.ssh.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.handler.GFacHandlerException;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.CommandExecutor;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
-import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-/**
- * Execute application using remote SSH
- */
-public class SSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
- private Cluster cluster;
- private String jobID = null;
- private String taskID = null;
- // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
- private boolean hpcType = false;
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- taskID = jobExecutionContext.getTaskData().getTaskID();
- if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
- jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
-
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- details.setJobID(taskID);
- details.setJobDescription(remoteFile);
- jobExecutionContext.setJobDetails(details);
- JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null);
- details.setJobDescription(jobDescriptor.toXML());
-
- GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
- log.info(remoteFile);
- try {
- File runscript = createShellScript(jobExecutionContext);
- cluster.scpTo(remoteFile, runscript.getAbsolutePath());
- } catch (Exception e) {
- throw new GFacProviderException(e.getLocalizedMessage(), e);
- }
- }else{
- hpcType = true;
- }
- }
-
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- if (!hpcType) {
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- try {
- /*
- * Execute
- */
- String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- details.setJobDescription(execuable);
-
-// GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
- RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
-
- StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
-
- CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
- String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
-
- log.info("stdout=" + stdOutputString);
-
-// GFacUtils.updateJobStatus(details, JobState.COMPLETE);
- } catch (Exception e) {
- throw new GFacProviderException(e.getMessage(), e);
- } finally {
- if (cluster != null) {
- try {
- cluster.disconnect();
- } catch (SSHApiException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
- }
- } else {
- try {
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
- HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
- JobDetails jobDetails = new JobDetails();
- String taskID = jobExecutionContext.getTaskData().getTaskID();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) != null) {
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
-
- log.info(jobDescriptor.toXML());
-
- jobDetails.setJobDescription(jobDescriptor.toXML());
-
- String jobID = cluster.submitBatchJob(jobDescriptor);
- jobExecutionContext.setJobDetails(jobDetails);
- if (jobID == null) {
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- } else {
- jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- }
-
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- }
- } catch (GFacException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-
- }
-
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- throw new NotImplementedException();
- }
-
-
- private File createShellScript(JobExecutionContext context) throws IOException {
- ApplicationDeploymentDescriptionType app = context.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
- + new Random().nextLong();
-
- File shellScript = File.createTempFile(uniqueDir, "sh");
- OutputStream out = new FileOutputStream(shellScript);
-
- out.write("#!/bin/bash\n".getBytes());
- out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
- out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
- out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
- .getBytes());
- // get the env of the host and the application
- NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
- Map<String, String> nv = new HashMap<String, String>();
- if (env != null) {
- for (int i = 0; i < env.length; i++) {
- String key = env[i].getName();
- String value = env[i].getValue();
- nv.put(key, value);
- }
- }
- for (Entry<String, String> entry : nv.entrySet()) {
- log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
- out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-
- }
-
- // prepare the command
- final String SPACE = " ";
- StringBuffer cmd = new StringBuffer();
- cmd.append(app.getExecutableLocation());
- cmd.append(SPACE);
-
- MessageContext input = context.getInMessageContext();
- ;
- Map<String, Object> inputs = input.getParameters();
- Set<String> keys = inputs.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
- if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
- for (String value : values) {
- cmd.append(value);
- cmd.append(SPACE);
- }
- } else {
- String paramValue = MappingFactory.toString(actualParameter);
- cmd.append(paramValue);
- cmd.append(SPACE);
- }
- }
- // We redirect the error and stdout to remote files, they will be read
- // in later
- cmd.append(SPACE);
- cmd.append("1>");
- cmd.append(SPACE);
- cmd.append(app.getStandardOutput());
- cmd.append(SPACE);
- cmd.append("2>");
- cmd.append(SPACE);
- cmd.append(app.getStandardError());
-
- String cmdStr = cmd.toString();
- log.info("Command = " + cmdStr);
- out.write((cmdStr + "\n").getBytes());
- String message = "\"execuationSuceeded\"";
- out.write(("echo " + message + "\n").getBytes());
- out.close();
-
- return shellScript;
- }
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
- /**
- * This method will read standard output and if there's any it will be parsed
- * @param jobIDReaderCommandOutput
- * @param errorMsg
- * @return
- * @throws SSHApiException
- */
- private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
- String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
- String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
-
- if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
- log.error("Standard Error output : " + stdErrorString);
- throw new SSHApiException(errorMsg + stdErrorString);
- }
- return stdOutputString;
- }
-
-
-}
[3/3] git commit: fixing more packaing issue
Posted by la...@apache.org.
fixing more packaing issue
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6209ee09
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6209ee09
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6209ee09
Branch: refs/heads/master
Commit: 6209ee096e2dea80863cabaaad6acce6f2910ee9
Parents: 49b6987
Author: lahiru <la...@apache.org>
Authored: Mon May 5 11:06:59 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon May 5 11:06:59 2014 -0400
----------------------------------------------------------------------
.../server/src/main/resources/gfac-config.xml | 2 +-
.../airavata/gfac/bes/impl/BESProvider.java | 568 -------------------
.../gfac/bes/provider/impl/BESProvider.java | 568 +++++++++++++++++++
.../airavata/gfac/gram/impl/GramProvider.java | 527 -----------------
.../gfac/gram/provider/impl/GramProvider.java | 527 +++++++++++++++++
.../gfac/gsissh/impl/GSISSHProvider.java | 138 -----
.../gsissh/provider/impl/GSISSHProvider.java | 138 +++++
.../airavata/gfac/local/impl/LocalProvider.java | 239 --------
.../gfac/local/provider/impl/LocalProvider.java | 239 ++++++++
.../gfac/services/impl/LocalProviderTest.java | 2 +-
.../src/test/resources/gfac-config.xml | 2 +-
.../airavata/gfac/ssh/impl/SSHProvider.java | 310 ----------
.../gfac/ssh/provider/impl/SSHProvider.java | 310 ++++++++++
.../src/test/resources/gfac-config.xml | 2 +-
.../src/main/resources/gfac-config.xml | 2 +-
.../src/test/resources/gfac-config.xml | 2 +-
16 files changed, 1788 insertions(+), 1788 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/configuration/server/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.xml b/modules/configuration/server/src/main/resources/gfac-config.xml
index 0e95bc4..4cddfda 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.xml
+++ b/modules/configuration/server/src/main/resources/gfac-config.xml
@@ -24,7 +24,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
deleted file mode 100644
index c41632f..0000000
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.bes.impl;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigInteger;
-import java.security.InvalidKeyException;
-import java.security.KeyPair;
-import java.security.KeyPairGenerator;
-import java.security.PrivateKey;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-
-import javax.security.auth.x500.X500Principal;
-
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.bes.security.GSISecurityContext;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
-import org.apache.airavata.gfac.notification.events.UnicoreJobIDEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.bes.utils.DataTransferrer;
-import org.apache.airavata.gfac.bes.utils.JSDLGenerator;
-import org.apache.airavata.gfac.bes.utils.StorageCreator;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.registry.api.workflow.ApplicationJob;
-import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
-import org.apache.airavata.schemas.gfac.UnicoreHostType;
-import org.apache.xmlbeans.XmlCursor;
-import org.bouncycastle.asn1.ASN1InputStream;
-import org.bouncycastle.asn1.x500.X500Name;
-import org.bouncycastle.asn1.x500.style.BCStyle;
-import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
-import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
-import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration;
-import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration.Enum;
-import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType;
-import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityDocument;
-import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityResponseDocument;
-import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesDocument;
-import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesResponseDocument;
-import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument;
-import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3.x2005.x08.addressing.EndpointReferenceType;
-
-import de.fzj.unicore.bes.client.FactoryClient;
-import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
-import de.fzj.unicore.uas.client.StorageClient;
-import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
-import eu.emi.security.authn.x509.helpers.CertificateHelpers;
-import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
-import eu.emi.security.authn.x509.impl.CertificateUtils;
-import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
-import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator;
-import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
-import eu.emi.security.authn.x509.impl.X500NameUtils;
-import eu.unicore.util.httpclient.DefaultClientConfiguration;
-
-
-
-public class BESProvider extends AbstractProvider {
- protected final Logger log = LoggerFactory.getLogger(this.getClass());
-
- private DefaultClientConfiguration secProperties;
-
- private String jobId;
-
-
-
- public void initialize(JobExecutionContext jobExecutionContext)
- throws GFacProviderException, GFacException {
- log.info("Initializing UNICORE Provider");
- super.initialize(jobExecutionContext);
- initSecurityProperties(jobExecutionContext);
- log.debug("initialized security properties");
- }
-
-
- public void execute(JobExecutionContext jobExecutionContext)
- throws GFacProviderException {
- UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
- .getType();
-
- String factoryUrl = host.getUnicoreBESEndPointArray()[0];
-
- EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
- eprt.addNewAddress().setStringValue(factoryUrl);
-
- String userDN = getUserName(jobExecutionContext);
-
- if (userDN == null || userDN.equalsIgnoreCase("admin")) {
- userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
- }
-
- String xlogin = getCNFromUserDN(userDN);
- // create storage
- StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, xlogin);
-
- StorageClient sc = null;
- try {
- try {
- sc = storageCreator.createStorage();
- } catch (Exception e2) {
- log.error("Cannot create storage..");
- throw new GFacProviderException("Cannot create storage..", e2);
- }
-
- CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
- JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory.newInstance();
-
- JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition();
- try {
- jobDefinition = JSDLGenerator.buildJSDLInstance(jobExecutionContext, sc.getUrl()).getJobDefinition();
- cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
-
- log.info("JSDL" + jobDefDoc.toString());
- } catch (Exception e1) {
- throw new GFacProviderException("Cannot generate JSDL instance from the JobExecutionContext.", e1);
- }
-
- // upload files if any
- DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
- dt.uploadLocalFiles();
-
- FactoryClient factory = null;
- try {
- factory = new FactoryClient(eprt, secProperties);
- } catch (Exception e) {
- throw new GFacProviderException(e.getLocalizedMessage(), e);
- }
-
- CreateActivityResponseDocument response = null;
- try {
- log.info(String.format("Activity Submitting to %s ... \n", factoryUrl));
- response = factory.createActivity(cad);
- log.info(String.format("Activity Submitted to %s \n", factoryUrl));
- } catch (Exception e) {
- throw new GFacProviderException("Cannot create activity.", e);
- }
- EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
-
- log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
-
- // factory.waitWhileActivityIsDone(activityEpr, 1000);
- jobId = WSUtilities.extractResourceID(activityEpr);
- if (jobId == null) {
- jobId = new Long(Calendar.getInstance().getTimeInMillis()).toString();
- }
- log.info("JobID: " + jobId);
- jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
- saveApplicationJob(jobExecutionContext, jobDefinition, activityEpr.toString());
-
- factory.getActivityStatus(activityEpr);
- log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(),
- factory.getActivityStatus(activityEpr).toString()));
-
- // TODO publish the status messages to the message bus
- while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
- && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
- && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
-
- ActivityStatusType activityStatus = null;
- try {
- activityStatus = getStatus(factory, activityEpr);
- JobState jobStatus = getApplicationJobStatus(activityStatus);
- String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
- jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
- details.setJobID(jobId);
- GFacUtils.updateJobStatus(jobExecutionContext, details, jobStatus);
- } catch (UnknownActivityIdentifierFault e) {
- throw new GFacProviderException(e.getMessage(), e.getCause());
- }catch (GFacException e) {
- throw new GFacProviderException(e.getMessage(), e.getCause());
- }
-
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- }
- continue;
- }
-
- ActivityStatusType activityStatus = null;
- try {
- activityStatus = getStatus(factory, activityEpr);
- } catch (UnknownActivityIdentifierFault e) {
- throw new GFacProviderException(e.getMessage(), e.getCause());
- }
-
- log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState()
- .toString()));
-
- if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
- String error = activityStatus.getFault().getFaultcode().getLocalPart() + "\n"
- + activityStatus.getFault().getFaultstring() + "\n EXITCODE: " + activityStatus.getExitCode();
- log.info(error);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- }
- dt.downloadStdOuts();
- } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
- String experimentID = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
- JobState jobStatus = JobState.CANCELED;
- String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
- jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
- details.setJobID(jobId);
- try {
- GFacUtils.saveJobStatus(jobExecutionContext,details, jobStatus);
- } catch (GFacException e) {
- throw new GFacProviderException(e.getLocalizedMessage(),e);
- }
- throw new GFacProviderException(experimentID + "Job Canceled");
- }
-
- else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- }
- if (activityStatus.getExitCode() == 0) {
- dt.downloadRemoteFiles();
- } else {
- dt.downloadStdOuts();
- }
- }
-
- } catch (UnknownActivityIdentifierFault e1) {
- throw new GFacProviderException(e1.getLocalizedMessage(), e1);
- } finally {
- // destroy sms instance
- try {
- if (sc != null) {
- sc.destroy();
- }
- } catch (Exception e) {
- log.warn("Cannot destroy temporary SMS instance:" + sc.getUrl(), e);
- }
- }
- }
-
- private JobState getApplicationJobStatus(ActivityStatusType activityStatus){
- if (activityStatus == null) {
- return JobState.UNKNOWN;
- }
- Enum state = activityStatus.getState();
- String status = null;
- XmlCursor acursor = activityStatus.newCursor();
- try {
- if (acursor.toFirstChild()) {
- if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
- status = acursor.getName().getLocalPart();
- }
- }
- if (status != null) {
- if (status.equalsIgnoreCase("Queued") || status.equalsIgnoreCase("Starting")
- || status.equalsIgnoreCase("Ready")) {
- return JobState.QUEUED;
- } else if (status.equalsIgnoreCase("Staging-In")) {
- return JobState.SUBMITTED;
- } else if (status.equalsIgnoreCase("Staging-Out") || status.equalsIgnoreCase("FINISHED")) {
- return JobState.COMPLETE;
- } else if (status.equalsIgnoreCase("Executing")) {
- return JobState.ACTIVE;
- } else if (status.equalsIgnoreCase("FAILED")) {
- return JobState.FAILED;
- } else if (status.equalsIgnoreCase("CANCELLED")) {
- return JobState.CANCELED;
- }
- } else {
- if (ActivityStateEnumeration.CANCELLED.equals(state)) {
- return JobState.CANCELED;
- } else if (ActivityStateEnumeration.FAILED.equals(state)) {
- return JobState.FAILED;
- } else if (ActivityStateEnumeration.FINISHED.equals(state)) {
- return JobState.COMPLETE;
- } else if (ActivityStateEnumeration.RUNNING.equals(state)) {
- return JobState.ACTIVE;
- }
- }
- } finally {
- if (acursor != null)
- acursor.dispose();
- }
- return JobState.UNKNOWN;
- }
-
- private void saveApplicationJob(JobExecutionContext jobExecutionContext, JobDefinitionType jobDefinition,
- String metadata) {
- ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
- appJob.setJobId(jobId);
- appJob.setJobData(jobDefinition.toString());
- appJob.setSubmittedTime(Calendar.getInstance().getTime());
- appJob.setStatus(ApplicationJobStatus.SUBMITTED);
- appJob.setStatusUpdateTime(appJob.getSubmittedTime());
- appJob.setMetadata(metadata);
- GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- secProperties = null;
- }
-
- /**
- * EndpointReference need to be saved to make cancel work.
- *
- * @param activityEpr
- * @param jobExecutionContext
- * @throws GFacProviderException
- */
- public void cancelJob(String activityEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
- try {
- initSecurityProperties(jobExecutionContext);
- EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(activityEpr);
- UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
- .getType();
-
- String factoryUrl = host.getUnicoreBESEndPointArray()[0];
- EndpointReferenceType epr = EndpointReferenceType.Factory.newInstance();
- epr.addNewAddress().setStringValue(factoryUrl);
-
- FactoryClient factory = new FactoryClient(epr, secProperties);
- factory.terminateActivity(eprt);
- } catch (Exception e) {
- throw new GFacProviderException(e.getLocalizedMessage(),e);
- }
-
- }
-
- protected void downloadOffline(String smsEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
- try {
- initSecurityProperties(jobExecutionContext);
- EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(smsEpr);
- StorageClient sms = new StorageClient(eprt, secProperties);
- DataTransferrer dt = new DataTransferrer(jobExecutionContext, sms);
- // there must be output files there
- // this is also possible if client is re-connected, the jobs are
- // still
- // running and no output is produced
- dt.downloadRemoteFiles();
-
- // may be use the below method before downloading for checking
- // the number of entries
- // sms.listDirectory(".");
-
- } catch (Exception e) {
- throw new GFacProviderException(e.getLocalizedMessage(), e);
- }
- }
-
- protected void initSecurityProperties(JobExecutionContext jobExecutionContext) throws GFacProviderException,
- GFacException {
-
- if (secProperties != null)
- return;
-
- GSISecurityContext gssContext = (GSISecurityContext) jobExecutionContext
- .getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT);
-
- try {
- String certLocation = gssContext.getTrustedCertificatePath();
- List<String> trustedCert = new ArrayList<String>();
- trustedCert.add(certLocation + "/*.0");
- trustedCert.add(certLocation + "/*.pem");
-
- DirectoryCertChainValidator dcValidator = new DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1,
- 60000, null);
-
- String userID = getUserName(jobExecutionContext);
-
- if ( userID == null || "".equals(userID) || userID.equalsIgnoreCase("admin") ) {
- userID = "CN=zdv575, O=Ultrascan Gateway, C=DE";
- }
-
- String userDN = userID.replaceAll("^\"|\"$", "");
-
- // TODO: should be changed to default airavata server locations
- KeyAndCertCredential cred = generateShortLivedCertificate(userDN, certLocation
- + "/cacert.pem", certLocation
- + "/cakey.pem", "ultrascan3");
- secProperties = new DefaultClientConfiguration(dcValidator, cred);
-
- // secProperties.doSSLAuthn();
- secProperties.getETDSettings().setExtendTrustDelegation(true);
-
- secProperties.setDoSignMessage(true);
-
- String[] outHandlers = secProperties.getOutHandlerClassNames();
-
- Set<String> outHandlerLst = null;
-
- // timeout in milliseconds
- Properties p = secProperties.getExtraSettings();
- p.setProperty("http.connection.timeout", "300000");
- p.setProperty("http.socket.timeout", "300000");
-
- if (outHandlers == null) {
- outHandlerLst = new HashSet<String>();
- } else {
- outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
- }
-
- outHandlerLst.add("de.fzj.unicore.uas.security.ProxyCertOutHandler");
-
- secProperties.setOutHandlerClassNames(outHandlerLst.toArray(new String[outHandlerLst.size()]));
-
- } catch (Exception e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
-
- //FIXME: Get user details
- private String getUserName(JobExecutionContext context) {
-// if (context.getConfigurationData()!= null) {
-// return context.getConfigurationData().getBasicMetadata().getUserName();
-// } else {
- return "";
-// }
- }
-
- protected ActivityStatusType getStatus(FactoryClient fc, EndpointReferenceType activityEpr)
- throws UnknownActivityIdentifierFault {
-
- GetActivityStatusesDocument stats = GetActivityStatusesDocument.Factory.newInstance();
-
- stats.addNewGetActivityStatuses().setActivityIdentifierArray(new EndpointReferenceType[] { activityEpr });
-
- GetActivityStatusesResponseDocument resDoc = fc.getActivityStatuses(stats);
-
- ActivityStatusType activityStatus = resDoc.getGetActivityStatusesResponse().getResponseArray()[0]
- .getActivityStatus();
- return activityStatus;
- }
-
- protected String formatStatusMessage(String activityUrl, String status) {
- return String.format("Activity %s is %s.\n", activityUrl, status);
- }
-
- protected String subStatusAsString(ActivityStatusType statusType) {
-
- StringBuffer sb = new StringBuffer();
-
- sb.append(statusType.getState().toString());
-
- XmlCursor acursor = statusType.newCursor();
- if (acursor.toFirstChild()) {
- do {
- if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
- sb.append(":");
- sb.append(acursor.getName().getLocalPart());
- }
- } while (acursor.toNextSibling());
- acursor.dispose();
- return sb.toString();
- } else {
- acursor.dispose();
- return sb.toString();
- }
-
- }
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-
- protected KeyAndCertCredential generateShortLivedCertificate(String userDN, String caCertPath, String caKeyPath,
- String caPwd) throws Exception {
- final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
- // ago
-
- final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
- final long endTime = startTime + 30 * 3600 * 1000;
-
- String keyLengthProp = "1024";
- int keyLength = Integer.parseInt(keyLengthProp);
- String signatureAlgorithm = "SHA1withRSA";
-
- KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath, caPwd);
-
- KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm());
- kpg.initialize(keyLength);
- KeyPair pair = kpg.generateKeyPair();
-
- X500Principal subjectDN = new X500Principal(userDN);
- Random rand = new Random();
-
- SubjectPublicKeyInfo publicKeyInfo;
- try {
- publicKeyInfo = SubjectPublicKeyInfo.getInstance(new ASN1InputStream(pair.getPublic().getEncoded())
- .readObject());
- } catch (IOException e) {
- throw new InvalidKeyException("Can not parse the public key"
- + "being included in the short lived certificate", e);
- }
-
- X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal());
-
- X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
-
- X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand),
- new Date(startTime), new Date(endTime), subjectX500Name, publicKeyInfo);
-
- AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate());
-
- X509Certificate certificate = certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null);
-
- certificate.checkValidity(new Date());
- certificate.verify(caCred.getCertificate().getPublicKey());
- KeyAndCertCredential result = new KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate,
- caCred.getCertificate() });
-
- return result;
- }
-
- private KeyAndCertCredential getCACredential(String caCertPath, String caKeyPath, String password) throws Exception {
- InputStream isKey = new FileInputStream(caKeyPath);
- PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM, password.toCharArray());
-
- InputStream isCert = new FileInputStream(caCertPath);
- X509Certificate caCert = CertificateUtils.loadCertificate(isCert, Encoding.PEM);
-
- if (isKey != null)
- isKey.close();
- if (isCert != null)
- isCert.close();
-
- return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
- }
-
- private String getCNFromUserDN(String userDN) {
- return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
new file mode 100644
index 0000000..cebb7f8
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -0,0 +1,568 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.bes.provider.impl;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import javax.security.auth.x500.X500Principal;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.bes.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
+import org.apache.airavata.gfac.notification.events.UnicoreJobIDEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.bes.utils.DataTransferrer;
+import org.apache.airavata.gfac.bes.utils.JSDLGenerator;
+import org.apache.airavata.gfac.bes.utils.StorageCreator;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
+import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.apache.xmlbeans.XmlCursor;
+import org.bouncycastle.asn1.ASN1InputStream;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration.Enum;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType;
+import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityResponseDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesResponseDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import de.fzj.unicore.bes.client.FactoryClient;
+import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import eu.emi.security.authn.x509.helpers.CertificateHelpers;
+import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
+import eu.emi.security.authn.x509.impl.CertificateUtils;
+import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
+import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+
+
+
+public class BESProvider extends AbstractProvider {
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private DefaultClientConfiguration secProperties;
+
+ private String jobId;
+
+
+
+ public void initialize(JobExecutionContext jobExecutionContext)
+ throws GFacProviderException, GFacException {
+ log.info("Initializing UNICORE Provider");
+ super.initialize(jobExecutionContext);
+ initSecurityProperties(jobExecutionContext);
+ log.debug("initialized security properties");
+ }
+
+
+ public void execute(JobExecutionContext jobExecutionContext)
+ throws GFacProviderException {
+ UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
+ .getType();
+
+ String factoryUrl = host.getUnicoreBESEndPointArray()[0];
+
+ EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
+ eprt.addNewAddress().setStringValue(factoryUrl);
+
+ String userDN = getUserName(jobExecutionContext);
+
+ if (userDN == null || userDN.equalsIgnoreCase("admin")) {
+ userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+ }
+
+ String xlogin = getCNFromUserDN(userDN);
+ // create storage
+ StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, xlogin);
+
+ StorageClient sc = null;
+ try {
+ try {
+ sc = storageCreator.createStorage();
+ } catch (Exception e2) {
+ log.error("Cannot create storage..");
+ throw new GFacProviderException("Cannot create storage..", e2);
+ }
+
+ CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
+ JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory.newInstance();
+
+ JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition();
+ try {
+ jobDefinition = JSDLGenerator.buildJSDLInstance(jobExecutionContext, sc.getUrl()).getJobDefinition();
+ cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
+
+ log.info("JSDL" + jobDefDoc.toString());
+ } catch (Exception e1) {
+ throw new GFacProviderException("Cannot generate JSDL instance from the JobExecutionContext.", e1);
+ }
+
+ // upload files if any
+ DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
+ dt.uploadLocalFiles();
+
+ FactoryClient factory = null;
+ try {
+ factory = new FactoryClient(eprt, secProperties);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+
+ CreateActivityResponseDocument response = null;
+ try {
+ log.info(String.format("Activity Submitting to %s ... \n", factoryUrl));
+ response = factory.createActivity(cad);
+ log.info(String.format("Activity Submitted to %s \n", factoryUrl));
+ } catch (Exception e) {
+ throw new GFacProviderException("Cannot create activity.", e);
+ }
+ EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+ log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+ // factory.waitWhileActivityIsDone(activityEpr, 1000);
+ jobId = WSUtilities.extractResourceID(activityEpr);
+ if (jobId == null) {
+ jobId = new Long(Calendar.getInstance().getTimeInMillis()).toString();
+ }
+ log.info("JobID: " + jobId);
+ jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
+ saveApplicationJob(jobExecutionContext, jobDefinition, activityEpr.toString());
+
+ factory.getActivityStatus(activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(),
+ factory.getActivityStatus(activityEpr).toString()));
+
+ // TODO publish the status messages to the message bus
+ while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
+
+ ActivityStatusType activityStatus = null;
+ try {
+ activityStatus = getStatus(factory, activityEpr);
+ JobState jobStatus = getApplicationJobStatus(activityStatus);
+ String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
+ jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
+ details.setJobID(jobId);
+ GFacUtils.updateJobStatus(jobExecutionContext, details, jobStatus);
+ } catch (UnknownActivityIdentifierFault e) {
+ throw new GFacProviderException(e.getMessage(), e.getCause());
+ }catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e.getCause());
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
+ ActivityStatusType activityStatus = null;
+ try {
+ activityStatus = getStatus(factory, activityEpr);
+ } catch (UnknownActivityIdentifierFault e) {
+ throw new GFacProviderException(e.getMessage(), e.getCause());
+ }
+
+ log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState()
+ .toString()));
+
+ if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+ String error = activityStatus.getFault().getFaultcode().getLocalPart() + "\n"
+ + activityStatus.getFault().getFaultstring() + "\n EXITCODE: " + activityStatus.getExitCode();
+ log.info(error);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ dt.downloadStdOuts();
+ } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+ String experimentID = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
+ JobState jobStatus = JobState.CANCELED;
+ String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
+ jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
+ details.setJobID(jobId);
+ try {
+ GFacUtils.saveJobStatus(jobExecutionContext,details, jobStatus);
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ throw new GFacProviderException(experimentID + "Job Canceled");
+ }
+
+ else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ if (activityStatus.getExitCode() == 0) {
+ dt.downloadRemoteFiles();
+ } else {
+ dt.downloadStdOuts();
+ }
+ }
+
+ } catch (UnknownActivityIdentifierFault e1) {
+ throw new GFacProviderException(e1.getLocalizedMessage(), e1);
+ } finally {
+ // destroy sms instance
+ try {
+ if (sc != null) {
+ sc.destroy();
+ }
+ } catch (Exception e) {
+ log.warn("Cannot destroy temporary SMS instance:" + sc.getUrl(), e);
+ }
+ }
+ }
+
+ private JobState getApplicationJobStatus(ActivityStatusType activityStatus){
+ if (activityStatus == null) {
+ return JobState.UNKNOWN;
+ }
+ Enum state = activityStatus.getState();
+ String status = null;
+ XmlCursor acursor = activityStatus.newCursor();
+ try {
+ if (acursor.toFirstChild()) {
+ if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+ status = acursor.getName().getLocalPart();
+ }
+ }
+ if (status != null) {
+ if (status.equalsIgnoreCase("Queued") || status.equalsIgnoreCase("Starting")
+ || status.equalsIgnoreCase("Ready")) {
+ return JobState.QUEUED;
+ } else if (status.equalsIgnoreCase("Staging-In")) {
+ return JobState.SUBMITTED;
+ } else if (status.equalsIgnoreCase("Staging-Out") || status.equalsIgnoreCase("FINISHED")) {
+ return JobState.COMPLETE;
+ } else if (status.equalsIgnoreCase("Executing")) {
+ return JobState.ACTIVE;
+ } else if (status.equalsIgnoreCase("FAILED")) {
+ return JobState.FAILED;
+ } else if (status.equalsIgnoreCase("CANCELLED")) {
+ return JobState.CANCELED;
+ }
+ } else {
+ if (ActivityStateEnumeration.CANCELLED.equals(state)) {
+ return JobState.CANCELED;
+ } else if (ActivityStateEnumeration.FAILED.equals(state)) {
+ return JobState.FAILED;
+ } else if (ActivityStateEnumeration.FINISHED.equals(state)) {
+ return JobState.COMPLETE;
+ } else if (ActivityStateEnumeration.RUNNING.equals(state)) {
+ return JobState.ACTIVE;
+ }
+ }
+ } finally {
+ if (acursor != null)
+ acursor.dispose();
+ }
+ return JobState.UNKNOWN;
+ }
+
+ private void saveApplicationJob(JobExecutionContext jobExecutionContext, JobDefinitionType jobDefinition,
+ String metadata) {
+ ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+ appJob.setJobId(jobId);
+ appJob.setJobData(jobDefinition.toString());
+ appJob.setSubmittedTime(Calendar.getInstance().getTime());
+ appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+ appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+ appJob.setMetadata(metadata);
+ GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ secProperties = null;
+ }
+
+ /**
+ * EndpointReference need to be saved to make cancel work.
+ *
+ * @param activityEpr
+ * @param jobExecutionContext
+ * @throws GFacProviderException
+ */
+ public void cancelJob(String activityEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ try {
+ initSecurityProperties(jobExecutionContext);
+ EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(activityEpr);
+ UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
+ .getType();
+
+ String factoryUrl = host.getUnicoreBESEndPointArray()[0];
+ EndpointReferenceType epr = EndpointReferenceType.Factory.newInstance();
+ epr.addNewAddress().setStringValue(factoryUrl);
+
+ FactoryClient factory = new FactoryClient(epr, secProperties);
+ factory.terminateActivity(eprt);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+
+ }
+
+ protected void downloadOffline(String smsEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ try {
+ initSecurityProperties(jobExecutionContext);
+ EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(smsEpr);
+ StorageClient sms = new StorageClient(eprt, secProperties);
+ DataTransferrer dt = new DataTransferrer(jobExecutionContext, sms);
+ // there must be output files there
+ // this is also possible if client is re-connected, the jobs are
+ // still
+ // running and no output is produced
+ dt.downloadRemoteFiles();
+
+ // may be use the below method before downloading for checking
+ // the number of entries
+ // sms.listDirectory(".");
+
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ protected void initSecurityProperties(JobExecutionContext jobExecutionContext) throws GFacProviderException,
+ GFacException {
+
+ if (secProperties != null)
+ return;
+
+ GSISecurityContext gssContext = (GSISecurityContext) jobExecutionContext
+ .getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT);
+
+ try {
+ String certLocation = gssContext.getTrustedCertificatePath();
+ List<String> trustedCert = new ArrayList<String>();
+ trustedCert.add(certLocation + "/*.0");
+ trustedCert.add(certLocation + "/*.pem");
+
+ DirectoryCertChainValidator dcValidator = new DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1,
+ 60000, null);
+
+ String userID = getUserName(jobExecutionContext);
+
+ if ( userID == null || "".equals(userID) || userID.equalsIgnoreCase("admin") ) {
+ userID = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+ }
+
+ String userDN = userID.replaceAll("^\"|\"$", "");
+
+ // TODO: should be changed to default airavata server locations
+ KeyAndCertCredential cred = generateShortLivedCertificate(userDN, certLocation
+ + "/cacert.pem", certLocation
+ + "/cakey.pem", "ultrascan3");
+ secProperties = new DefaultClientConfiguration(dcValidator, cred);
+
+ // secProperties.doSSLAuthn();
+ secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+ secProperties.setDoSignMessage(true);
+
+ String[] outHandlers = secProperties.getOutHandlerClassNames();
+
+ Set<String> outHandlerLst = null;
+
+ // timeout in milliseconds
+ Properties p = secProperties.getExtraSettings();
+ p.setProperty("http.connection.timeout", "300000");
+ p.setProperty("http.socket.timeout", "300000");
+
+ if (outHandlers == null) {
+ outHandlerLst = new HashSet<String>();
+ } else {
+ outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
+ }
+
+ outHandlerLst.add("de.fzj.unicore.uas.security.ProxyCertOutHandler");
+
+ secProperties.setOutHandlerClassNames(outHandlerLst.toArray(new String[outHandlerLst.size()]));
+
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+
+ //FIXME: Get user details
+ private String getUserName(JobExecutionContext context) {
+// if (context.getConfigurationData()!= null) {
+// return context.getConfigurationData().getBasicMetadata().getUserName();
+// } else {
+ return "";
+// }
+ }
+
+ protected ActivityStatusType getStatus(FactoryClient fc, EndpointReferenceType activityEpr)
+ throws UnknownActivityIdentifierFault {
+
+ GetActivityStatusesDocument stats = GetActivityStatusesDocument.Factory.newInstance();
+
+ stats.addNewGetActivityStatuses().setActivityIdentifierArray(new EndpointReferenceType[] { activityEpr });
+
+ GetActivityStatusesResponseDocument resDoc = fc.getActivityStatuses(stats);
+
+ ActivityStatusType activityStatus = resDoc.getGetActivityStatusesResponse().getResponseArray()[0]
+ .getActivityStatus();
+ return activityStatus;
+ }
+
+ protected String formatStatusMessage(String activityUrl, String status) {
+ return String.format("Activity %s is %s.\n", activityUrl, status);
+ }
+
+ protected String subStatusAsString(ActivityStatusType statusType) {
+
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(statusType.getState().toString());
+
+ XmlCursor acursor = statusType.newCursor();
+ if (acursor.toFirstChild()) {
+ do {
+ if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+ sb.append(":");
+ sb.append(acursor.getName().getLocalPart());
+ }
+ } while (acursor.toNextSibling());
+ acursor.dispose();
+ return sb.toString();
+ } else {
+ acursor.dispose();
+ return sb.toString();
+ }
+
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+
+ protected KeyAndCertCredential generateShortLivedCertificate(String userDN, String caCertPath, String caKeyPath,
+ String caPwd) throws Exception {
+ final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
+ // ago
+
+ final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
+ final long endTime = startTime + 30 * 3600 * 1000;
+
+ String keyLengthProp = "1024";
+ int keyLength = Integer.parseInt(keyLengthProp);
+ String signatureAlgorithm = "SHA1withRSA";
+
+ KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath, caPwd);
+
+ KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm());
+ kpg.initialize(keyLength);
+ KeyPair pair = kpg.generateKeyPair();
+
+ X500Principal subjectDN = new X500Principal(userDN);
+ Random rand = new Random();
+
+ SubjectPublicKeyInfo publicKeyInfo;
+ try {
+ publicKeyInfo = SubjectPublicKeyInfo.getInstance(new ASN1InputStream(pair.getPublic().getEncoded())
+ .readObject());
+ } catch (IOException e) {
+ throw new InvalidKeyException("Can not parse the public key"
+ + "being included in the short lived certificate", e);
+ }
+
+ X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal());
+
+ X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
+
+ X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand),
+ new Date(startTime), new Date(endTime), subjectX500Name, publicKeyInfo);
+
+ AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate());
+
+ X509Certificate certificate = certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null);
+
+ certificate.checkValidity(new Date());
+ certificate.verify(caCred.getCertificate().getPublicKey());
+ KeyAndCertCredential result = new KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate,
+ caCred.getCertificate() });
+
+ return result;
+ }
+
+ private KeyAndCertCredential getCACredential(String caCertPath, String caKeyPath, String password) throws Exception {
+ InputStream isKey = new FileInputStream(caKeyPath);
+ PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM, password.toCharArray());
+
+ InputStream isCert = new FileInputStream(caCertPath);
+ X509Certificate caCert = CertificateUtils.loadCertificate(isCert, Encoding.PEM);
+
+ if (isKey != null)
+ isKey.close();
+ if (isCert != null)
+ isCert.close();
+
+ return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
+ }
+
+ private String getCNFromUserDN(String userDN) {
+ return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/impl/GramProvider.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/impl/GramProvider.java
deleted file mode 100644
index b675c5e..0000000
--- a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/impl/GramProvider.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gram.impl;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.util.Map;
-import java.util.MissingResourceException;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.JobSubmissionFault;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.gram.security.GSISecurityContext;
-import org.apache.airavata.gfac.gram.util.GramJobSubmissionListener;
-import org.apache.airavata.gfac.gram.util.GramProviderUtils;
-import org.apache.airavata.gfac.notification.events.JobIDEvent;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.GlobusHostType;
-import org.globus.gram.GramException;
-import org.globus.gram.GramJob;
-import org.globus.gram.WaitingForCommitException;
-import org.globus.gram.internal.GRAMConstants;
-import org.globus.gram.internal.GRAMProtocolErrorConstants;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GramProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
-
- private GramJob job;
- private GramJobSubmissionListener listener;
- private boolean twoPhase = true;
-
- /**
- * If normal job submission fail due to an authorisation failure or script failure we
- * will re-attempt to submit the job. In-order to avoid any recursive loop during a continuous
- * failure we track whether failure paths are tried or not. Following booleans keeps track whether
- * we already tried a failure path or not.
- */
- /**
- * To track job submissions during a authorisation failure while requesting job.
- */
- private boolean renewCredentialsAttempt = false;
- /**
- * To track job submission during a script error situation.
- */
- private boolean reSubmissionInProgress = false;
- /**
- * To track authorisation failures during status monitoring.
- */
- private boolean authorisationFailedAttempt = false;
-
- private static final Map<String, GramJob> currentlyExecutingJobCache
- = new ConcurrentHashMap<String, GramJob>();
-
- private static Properties resources;
-
- static {
- try {
-
- String propFileName = "errors.properties";
- resources = new Properties();
- InputStream inputStream = GramProvider.class.getClassLoader()
- .getResourceAsStream(propFileName);
-
- if (inputStream == null) {
- throw new FileNotFoundException("property file '" + propFileName
- + "' not found in the classpath");
- }
-
- resources.load(inputStream);
-
- } catch (FileNotFoundException mre) {
- log.error("errors.properties not found", mre);
- } catch (IOException e) {
- log.error("Error reading errors.properties file", e);
- }
- }
-
-
- // This method prepare the environment before the application invocation.
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
-
- try {
- super.initialize(jobExecutionContext);
- String strTwoPhase = ServerSettings.getSetting("TwoPhase");
- if (strTwoPhase != null) {
- twoPhase = Boolean.parseBoolean(strTwoPhase);
- log.info("Two phase commit is set to " + twoPhase);
- }
- } catch (ApplicationSettingsException e) {
- log.warn("Error reading TwoPhase property from configurations.", e);
- }
-
- job = GramProviderUtils.setupEnvironment(jobExecutionContext, twoPhase);
- listener = new GramJobSubmissionListener(job, jobExecutionContext);
- job.addListener(listener);
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException{
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
-
- StringBuilder stringBuilder = new StringBuilder();
- try {
-
- GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
- getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
- job.setCredentials(gssCred);
- // We do not support multiple gatekeepers in XBaya GUI, so we simply pick the 0th element in the array
- String gateKeeper = host.getGlobusGateKeeperEndPointArray(0);
- log.info("Request to contact:" + gateKeeper);
-
- stringBuilder.append("Finished launching job, Host = ").append(host.getHostAddress()).append(" RSL = ")
- .append(job.getRSL()).append(" working directory = ").append(app.getStaticWorkingDirectory())
- .append(" temp directory = ").append(app.getScratchWorkingDirectory())
- .append(" Globus GateKeeper Endpoint = ").append(gateKeeper);
-
- log.info(stringBuilder.toString());
-
- submitJobs(gateKeeper, jobExecutionContext, host);
-
- } catch (ApplicationSettingsException e) {
- throw new GFacException(e.getMessage(), e);
- } finally {
- if (job != null) {
- try {
- /*
- * Remove listener
- */
- job.removeListener(listener);
- } catch (Exception e) {
- log.error(e.getMessage());
- }
- }
- }
- }
-
- private void submitJobs(String gateKeeper,
- JobExecutionContext jobExecutionContext,
- GlobusHostType globusHostType) throws GFacException, GFacProviderException {
- boolean applicationSaved=false;
- String taskID = jobExecutionContext.getTaskData().getTaskID();
-
- if (twoPhase) {
- try {
- /*
- * The first boolean is to force communication through SSLv3
- * The second boolean is to specify the job is a batch job - use true for interactive and false for
- * batch.
- * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
- */
- job.request(true, gateKeeper, false, false);
-
- // Single boolean to track all authentication failures, therefore we need to re-initialize
- // this here
- renewCredentialsAttempt = false;
-
- } catch (WaitingForCommitException e) {
- String jobID = job.getIDAsString();
-
- details.setJobID(jobID);
- details.setJobDescription(job.getRSL());
- jobExecutionContext.setJobDetails(details);
- GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.UN_SUBMITTED);
-
- applicationSaved=true;
- String jobStatusMessage = "Un-submitted JobID= " + jobID;
- log.info(jobStatusMessage);
- jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
-
- log.info("Two phase commit: sending COMMIT_REQUEST signal; Job id - " + jobID);
-
- try {
- job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
-
- } catch (GramException gramException) {
- throw new GFacException("Error while sending commit request. Job Id - "
- + job.getIDAsString(), gramException);
- } catch (GSSException gssException) {
-
- // User credentials are invalid
- log.error("Error while submitting commit request - Credentials provided are invalid. Job Id - "
- + job.getIDAsString(), e);
- log.info("Attempting to renew credentials and re-submit commit signal...");
- GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- renewCredentials(jobExecutionContext);
-
- try {
- job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
- } catch (GramException e1) {
- GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacException("Error while sending commit request. Job Id - "
- + job.getIDAsString(), e1);
- } catch (GSSException e1) {
- GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacException("Error while sending commit request. Job Id - "
- + job.getIDAsString() + ". Credentials provided invalid", e1);
- }
- }
- GFacUtils.updateJobStatus(jobExecutionContext, details, JobState.SUBMITTED);
- jobStatusMessage = "Submitted JobID= " + job.getIDAsString();
- log.info(jobStatusMessage);
- jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
-
- } catch (GSSException e) {
- // Renew credentials and re-submit
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-
- reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
-
- } catch (GramException e) {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-
- throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
- }
- } else {
-
- /*
- * The first boolean is to force communication through SSLv3
- * The second boolean is to specify the job is a batch job - use true for interactive and false for
- * batch.
- * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
- */
- try {
-
- job.request(true, gateKeeper, false, false);
- renewCredentialsAttempt = false;
-
- } catch (GramException e) {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
- } catch (GSSException e) {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- // Renew credentials and re-submit
- reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
- }
-
- String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
- log.info(jobStatusMessage);
- jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
-
- }
-
- currentlyExecutingJobCache.put(job.getIDAsString(), job);
- /*
- * Wait until job is done
- */
- listener.waitFor();
-
- checkJobStatus(jobExecutionContext, globusHostType, gateKeeper);
-
- }
-
- private void renewCredentials(JobExecutionContext jobExecutionContext) throws GFacException {
-
- renewCredentials(this.job, jobExecutionContext);
- }
-
- private void renewCredentials(GramJob gramJob, JobExecutionContext jobExecutionContext) throws GFacException {
-
- try {
- GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
- getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).renewCredentials();
- gramJob.renew(gssCred);
- } catch (GramException e1) {
- throw new GFacException("Unable to renew credentials. Job Id - "
- + gramJob.getIDAsString(), e1);
- } catch (GSSException e1) {
- throw new GFacException("Unable to renew credentials. Job Id - "
- + gramJob.getIDAsString(), e1);
- } catch (ApplicationSettingsException e) {
- throw new GFacException(e.getLocalizedMessage(), e);
- }
- }
-
- private void reSubmitJob(String gateKeeper,
- JobExecutionContext jobExecutionContext,
- GlobusHostType globusHostType, Exception e) throws GFacException, GFacProviderException {
-
- if (!renewCredentialsAttempt) {
-
- renewCredentialsAttempt = true;
-
- // User credentials are invalid
- log.error("Error while submitting job - Credentials provided are invalid. Job Id - "
- + job.getIDAsString(), e);
- log.info("Attempting to renew credentials and re-submit jobs...");
-
- // Remove existing listener and register a new listener
- job.removeListener(listener);
- listener = new GramJobSubmissionListener(job, jobExecutionContext);
-
- job.addListener(listener);
-
- renewCredentials(jobExecutionContext);
-
- submitJobs(gateKeeper, jobExecutionContext, globusHostType);
-
- } else {
- throw new GFacException("Error while submitting job - Credentials provided are invalid. Job Id - "
- + job.getIDAsString(), e);
- }
-
- }
-
- private void reSubmitJob(String gateKeeper,
- JobExecutionContext jobExecutionContext,
- GlobusHostType globusHostType) throws GFacException, GFacProviderException {
-
- // User credentials are invalid
- log.info("Attempting to renew credentials and re-submit jobs...");
-
- // Remove existing listener and register a new listener
- job.removeListener(listener);
- listener = new GramJobSubmissionListener(job, jobExecutionContext);
-
- job.addListener(listener);
-
- renewCredentials(jobExecutionContext);
-
- submitJobs(gateKeeper, jobExecutionContext, globusHostType);
-
- }
-
-
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- }
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- cancelSingleJob(jobId, jobExecutionContext);
- }
-
-
- private void cancelSingleJob(String jobId, JobExecutionContext context) throws GFacException {
- // First check whether job id is in the cache
- if (currentlyExecutingJobCache.containsKey(jobId)) {
-
- synchronized (this) {
- GramJob gramJob = currentlyExecutingJobCache.get(jobId);
-
- // Even though we check using containsKey, at this point job could be null
- if (gramJob != null && (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
- gramJob.getStatus() != GRAMConstants.STATUS_FAILED)) {
- cancelJob(gramJob, context);
- }
- }
-
- } else {
-
- try {
- GSSCredential gssCred = ((GSISecurityContext)context.
- getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
-
- GramJob gramJob = new GramJob(null);
- try {
- gramJob.setID(jobId);
- } catch (MalformedURLException e) {
- throw new GFacException("Invalid job id - " + jobId, e);
- }
- gramJob.setCredentials(gssCred);
-
- synchronized (this) {
- if (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
- gramJob.getStatus() != GRAMConstants.STATUS_FAILED) {
- cancelJob(gramJob, context);
- }
- }
- } catch (ApplicationSettingsException e) {
- throw new GFacException(e);
- }
- }
- }
-
- private void cancelJob(GramJob gramJob, JobExecutionContext context) throws GFacException{
-
- try {
- gramJob.cancel();
- } catch (GramException e) {
- throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e);
- } catch (GSSException e) {
-
- log.warn("Credentials invalid to cancel job. Attempting to renew credentials and re-try. " +
- "Job id - " + gramJob.getIDAsString());
- renewCredentials(gramJob, context);
-
- try {
- gramJob.cancel();
- gramJob.signal(GramJob.SIGNAL_COMMIT_END);
- } catch (GramException e1) {
- throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e1);
- } catch (GSSException e1) {
- throw new GFacException("Error cancelling job, invalid credentials. Job id - "
- + gramJob.getIDAsString(), e);
- }
- }
-
- }
-
- public void initProperties(Map<String, String> properties) throws GFacException {
-
- }
-
- private void checkJobStatus(JobExecutionContext jobExecutionContext, GlobusHostType host, String gateKeeper)
- throws GFacProviderException {
- int jobStatus = listener.getCurrentStatus();
-
- if (jobStatus == GramJob.STATUS_FAILED) {
-
- String errorMsg = "Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " Job Exit Code = "
- + listener.getError() + " Error Description = " + getGramErrorString(listener.getError());
-
- if (listener.getError() == GRAMProtocolErrorConstants.INVALID_SCRIPT_REPLY) {
-
- // re-submitting without renewing
- // TODO verify why we re-submit jobs when we get a invalid script reply
- if (!reSubmissionInProgress) {
- reSubmissionInProgress = true;
-
- log.info("Invalid script reply received. Re-submitting job, id - " + job.getIDAsString());
- try {
- reSubmitJob(gateKeeper, jobExecutionContext, host);
- } catch (GFacException e) {
- throw new GFacProviderException
- ("Error during re-submission. Original job submission data - " + errorMsg, e);
- }
- return;
- }
-
- } else if (listener.getError() == GRAMProtocolErrorConstants.ERROR_AUTHORIZATION) {
-
- // re-submit with renewed credentials
- if (!authorisationFailedAttempt) {
- authorisationFailedAttempt = true;
- log.info("Authorisation error contacting provider. Re-submitting job with renewed credentials.");
-
- try {
- renewCredentials(jobExecutionContext);
- reSubmitJob(gateKeeper, jobExecutionContext, host);
- } catch (GFacException e) {
- throw new GFacProviderException
- ("Error during re-submission. Original job submission data - " + errorMsg, e);
- }
-
- return;
- }
-
- } else if (listener.getError() == GRAMProtocolErrorConstants.USER_CANCELLED) {
-
- log.info("User successfully cancelled job id " + job.getIDAsString());
- return;
- }
-
-
-
- log.error(errorMsg);
-
- synchronized (this) {
- currentlyExecutingJobCache.remove(job.getIDAsString());
- }
-
- throw new JobSubmissionFault(new Exception(errorMsg), host.getHostAddress(), gateKeeper,
- job.getRSL(), jobExecutionContext, getGramErrorString(listener.getError()),
- listener.getError());
-
- } else if (jobStatus == GramJob.STATUS_DONE) {
- log.info("Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " is successfully executed.");
-
- synchronized (this) {
- currentlyExecutingJobCache.remove(job.getIDAsString());
- }
- }
- }
-
- public String getGramErrorString(int errorCode) {
-
- if (resources != null) {
- try {
- return resources.getProperty(String.valueOf(errorCode));
- } catch (MissingResourceException mre) {
- log.warn("Error reading globus error descriptions.", mre);
- return "Error code: " + errorCode;
- }
- } else {
- return "Error code: " + errorCode;
- }
-
- }
-
-}