You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/21 19:56:38 UTC
[24/51] [abbrv] fixing more packaing issue
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
new file mode 100644
index 0000000..b0fd9eb
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -0,0 +1,310 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.ssh.provider.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.CommandExecutor;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
+import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * Execute application using remote SSH
+ */
+public class SSHProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
+ private Cluster cluster;
+ private String jobID = null;
+ private String taskID = null;
+ // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
+ private boolean hpcType = false;
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ super.initialize(jobExecutionContext);
+ if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ taskID = jobExecutionContext.getTaskData().getTaskID();
+ if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+ jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ details.setJobID(taskID);
+ details.setJobDescription(remoteFile);
+ jobExecutionContext.setJobDetails(details);
+ JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null);
+ details.setJobDescription(jobDescriptor.toXML());
+
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
+ log.info(remoteFile);
+ try {
+ File runscript = createShellScript(jobExecutionContext);
+ cluster.scpTo(remoteFile, runscript.getAbsolutePath());
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+ }else{
+ hpcType = true;
+ }
+ }
+
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ if (!hpcType) {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ try {
+ /*
+ * Execute
+ */
+ String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ details.setJobDescription(execuable);
+
+// GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
+ RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
+
+ StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
+
+ CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
+ String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
+
+ log.info("stdout=" + stdOutputString);
+
+// GFacUtils.updateJobStatus(details, JobState.COMPLETE);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ } finally {
+ if (cluster != null) {
+ try {
+ cluster.disconnect();
+ } catch (SSHApiException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+ }
+ } else {
+ try {
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ HostDescriptionType host = jobExecutionContext.getApplicationContext().
+ getHostDescription().getType();
+ HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
+ getApplicationDeploymentDescription().getType();
+ JobDetails jobDetails = new JobDetails();
+ String taskID = jobExecutionContext.getTaskData().getTaskID();
+ try {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) != null) {
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ // This installed path is a mandetory field, because this could change based on the computing resource
+ JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+
+ log.info(jobDescriptor.toXML());
+
+ jobDetails.setJobDescription(jobDescriptor.toXML());
+
+ String jobID = cluster.submitBatchJob(jobDescriptor);
+ jobExecutionContext.setJobDetails(jobDetails);
+ if (jobID == null) {
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ } else {
+ jobDetails.setJobID(jobID);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ }
+
+ } catch (SSHApiException e) {
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ } catch (Exception e) {
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ }
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+
+ }
+
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ throw new NotImplementedException();
+ }
+
+
+ private File createShellScript(JobExecutionContext context) throws IOException {
+ ApplicationDeploymentDescriptionType app = context.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
+ + new Random().nextLong();
+
+ File shellScript = File.createTempFile(uniqueDir, "sh");
+ OutputStream out = new FileOutputStream(shellScript);
+
+ out.write("#!/bin/bash\n".getBytes());
+ out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
+ .getBytes());
+ // get the env of the host and the application
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+ Map<String, String> nv = new HashMap<String, String>();
+ if (env != null) {
+ for (int i = 0; i < env.length; i++) {
+ String key = env[i].getName();
+ String value = env[i].getValue();
+ nv.put(key, value);
+ }
+ }
+ for (Entry<String, String> entry : nv.entrySet()) {
+ log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
+ out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+
+ }
+
+ // prepare the command
+ final String SPACE = " ";
+ StringBuffer cmd = new StringBuffer();
+ cmd.append(app.getExecutableLocation());
+ cmd.append(SPACE);
+
+ MessageContext input = context.getInMessageContext();
+ ;
+ Map<String, Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
+ for (String value : values) {
+ cmd.append(value);
+ cmd.append(SPACE);
+ }
+ } else {
+ String paramValue = MappingFactory.toString(actualParameter);
+ cmd.append(paramValue);
+ cmd.append(SPACE);
+ }
+ }
+ // We redirect the error and stdout to remote files, they will be read
+ // in later
+ cmd.append(SPACE);
+ cmd.append("1>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardOutput());
+ cmd.append(SPACE);
+ cmd.append("2>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardError());
+
+ String cmdStr = cmd.toString();
+ log.info("Command = " + cmdStr);
+ out.write((cmdStr + "\n").getBytes());
+ String message = "\"execuationSuceeded\"";
+ out.write(("echo " + message + "\n").getBytes());
+ out.close();
+
+ return shellScript;
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+ /**
+ * This method will read standard output and if there's any it will be parsed
+ * @param jobIDReaderCommandOutput
+ * @param errorMsg
+ * @return
+ * @throws SSHApiException
+ */
+ private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
+ String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
+ String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
+
+ if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
+ log.error("Standard Error output : " + stdErrorString);
+ throw new SSHApiException(errorMsg + stdErrorString);
+ }
+ return stdOutputString;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/integration-tests/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/integration-tests/src/test/resources/gfac-config.xml b/modules/integration-tests/src/test/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/integration-tests/src/test/resources/gfac-config.xml
+++ b/modules/integration-tests/src/test/resources/gfac-config.xml
@@ -24,7 +24,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
@@ -24,7 +24,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml b/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
+++ b/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
@@ -24,7 +24,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>