You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/03/25 23:33:31 UTC

[1/2] airavata git commit: Stage in and out jureca inputs and output from storage resoruce

Repository: airavata
Updated Branches:
  refs/heads/master ab262c706 -> eeb101679


Stage in and out jureca inputs and output from storage resoruce


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7bde79be
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7bde79be
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7bde79be

Branch: refs/heads/master
Commit: 7bde79be3430afba12b2fc653a15b05afa568356
Parents: 5f484b9
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Mar 25 18:33:07 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Mar 25 18:33:07 2016 -0400

----------------------------------------------------------------------
 .../gfac/impl/task/BESJobSubmissionTask.java    | 137 +++++++++++++++++--
 .../impl/task/utils/bes/DataTransferrer.java    |  14 +-
 2 files changed, 134 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/7bde79be/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
index 678924f..067bbe8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
@@ -21,23 +21,37 @@
 
 package org.apache.airavata.gfac.impl.task;
 
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
 import de.fzj.unicore.bes.client.ActivityClient;
 import de.fzj.unicore.bes.client.FactoryClient;
 import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
 import de.fzj.unicore.uas.client.StorageClient;
 import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
 import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.gfac.impl.task.utils.bes.*;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.experiment.UserConfigurationDataModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
@@ -45,7 +59,6 @@ import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
-import org.apache.airavata.registry.core.experiment.catalog.model.UserConfigurationData;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 import org.apache.airavata.registry.cpi.RegistryException;
@@ -56,7 +69,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3.x2005.x08.addressing.EndpointReferenceType;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Calendar;
+import java.util.List;
 import java.util.Map;
 
 public class BESJobSubmissionTask implements JobSubmissionTask {
@@ -64,6 +82,12 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
     private DefaultClientConfiguration secProperties;
 
     private String jobId;
+    private String hostName;
+    private String userName;
+    private String inputPath;
+    private int DEFAULT_SSH_PORT = 22;
+    private AuthenticationInfo authenticationInfo;
+
     @Override
     public JobStatus cancel(TaskContext taskcontext) throws TaskException {
         return null;
@@ -77,12 +101,12 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
     public TaskStatus execute(TaskContext taskContext) {
         TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
         StorageClient sc = null;
+        ProcessContext processContext = taskContext.getParentProcessContext();
         // FIXME - use original output dir
-        taskContext.getParentProcessContext().setOutputDir("");
-
+        setInputOutputLocations(processContext);
         try {
             if (secProperties == null) {
-                secProperties = getSecurityConfig(taskContext.getParentProcessContext());
+                secProperties = getSecurityConfig(processContext);
             }  // try secProperties = secProperties.clone() if we can't use already initialized ClientConfigurations.
         } catch (GFacException e) {
             String msg = "Unicorn security context initialization error";
@@ -93,7 +117,6 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
         }
 
         try {
-            ProcessContext processContext = taskContext.getParentProcessContext();
             JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
             JobSubmissionInterface jobSubmissionInterface = GFacUtils.getPreferredJobSubmissionInterface(processContext);
             String factoryUrl = null;
@@ -117,6 +140,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
 
             log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
 
+            // copy files to local
+            copyInputFilesToLocal(taskContext);
             // upload files if any
             DataTransferrer dt = new DataTransferrer(processContext, sc);
             dt.uploadLocalFiles();
@@ -162,6 +187,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             // now use the activity working directory property
             dt.setStorageClient(activityClient.getUspaceClient());
 
+            List<OutputDataObjectType> copyOutput = null;
+
             if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
                 String error = activityStatus.getFault().getFaultcode()
                         .getLocalPart()
@@ -177,7 +204,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
 
                 //What if job is failed before execution and there are not stdouts generated yet?
                 log.debug("Downloading any standard output and error files, if they were produced.");
-                dt.downloadRemoteFiles();
+                copyOutput = dt.downloadRemoteFiles();
 
             } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
                 JobState applicationJobStatus = JobState.CANCELED;
@@ -200,22 +227,110 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
 //                } else {
 //                    dt.downloadStdOuts();
 //                }
-                dt.downloadRemoteFiles();
+                copyOutput = dt.downloadRemoteFiles();
             }
-
-            dt.publishFinalOutputs();
+            if (copyOutput != null) {
+                copyOutputFilesToStorage(taskContext, copyOutput);
+                for (OutputDataObjectType outputDataObjectType : copyOutput) {
+                    GFacUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue());
+                }
+            }
+//            dt.publishFinalOutputs();
             taskStatus.setState(TaskState.COMPLETED);
         } catch (AppCatalogException e) {
-            log.error("Error while retrieving UNICORE job submission..");
+            log.error("Error while retrieving UNICORE job submission.." , e);
             taskStatus.setState(TaskState.FAILED);
         } catch (Exception e) {
-            log.error("Cannot create storage..", e);
+            log.error("BES task failed... ", e);
             taskStatus.setState(TaskState.FAILED);
         }
 
         return taskStatus;
     }
 
+    private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws GFacException {
+        ProcessContext pc = taskContext.getParentProcessContext();
+        String remoteFilePath = null, fileName = null, localFilePath = null;
+        try {
+            authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+            ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+
+            for (OutputDataObjectType output : copyOutput) {
+                switch (output.getType()) {
+                    case STDERR: case STDOUT: case STRING: case URI:
+                        localFilePath = output.getValue();
+                        if (localFilePath.indexOf("://") != -1) {
+                            localFilePath = localFilePath.substring(localFilePath.indexOf("://") + 2, localFilePath.length());
+                        }
+                        fileName = localFilePath.substring(localFilePath.lastIndexOf("/") + 1);
+                        URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+                        remoteFilePath = destinationURI.getPath();
+//                        = pc.getInputDir() + File.separator + fileName;
+                        SSHUtils.scpTo(localFilePath, remoteFilePath, sshSession);
+                        output.setValue("file:/" + destinationURI.getPath());
+                        break;
+                    default:
+                        break;
+                }
+            }
+        } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException e) {
+            log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e);
+            throw new GFacException("Error while scp output files to remote storage file location", e);
+        }
+    }
+
+    private void copyInputFilesToLocal(TaskContext taskContext) throws GFacException {
+        ProcessContext pc = taskContext.getParentProcessContext();
+        StorageResourceDescription storageResource = pc.getStorageResource();
+        StoragePreference storagePreference = pc.getStoragePreference();
+
+        if (storageResource != null) {
+            hostName = storageResource.getHostName();
+        } else {
+            throw new GFacException("Storage Resource is null");
+        }
+
+        if (storagePreference != null) {
+            userName = storagePreference.getLoginUserName();
+            inputPath = storagePreference.getFileSystemRootLocation();
+            inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+        } else {
+            throw new GFacException("Storage Preference is null");
+        }
+
+        String remoteFilePath = null, fileName = null, localFilePath = null;
+        URI remoteFileURI = null;
+        try {
+            authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+            ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+
+            List<InputDataObjectType> processInputs = pc.getProcessModel().getProcessInputs();
+            for (InputDataObjectType input : processInputs) {
+                if (input.getType() == DataType.URI) {
+                    remoteFileURI = new URI(input.getValue());
+                    remoteFilePath = remoteFileURI.getPath();
+                    fileName = remoteFilePath.substring(remoteFilePath.lastIndexOf("/") + 1);
+                    localFilePath = pc.getInputDir() + File.separator + fileName;
+                    SSHUtils.scpFrom(remoteFilePath, localFilePath, sshSession);
+                    input.setValue("file:/" + localFilePath);
+                }
+            }
+        } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException e) {
+            log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e);
+            throw new GFacException("Error while scp input files to local file location", e);
+        }
+    }
+
+    private void setInputOutputLocations(ProcessContext processContext) {
+        String localPath = System.getProperty("java.io.tmpdir") + File.separator + processContext.getProcessId();
+        new File(localPath).mkdir();
+
+        processContext.setInputDir(localPath);
+        processContext.setOutputDir(localPath);
+    }
+
     private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws GFacException {
         DefaultClientConfiguration clientConfig = null;
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/7bde79be/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
index b4bf9ed..08fa551 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
@@ -24,6 +24,7 @@ package org.apache.airavata.gfac.impl.task.utils.bes;
 import de.fzj.unicore.uas.client.StorageClient;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
@@ -105,8 +106,8 @@ public class DataTransferrer {
     public void uploadLocalFiles() throws GFacException {
         List<String> inFilePrms = new ArrayList<>();
         // FIXME - remove hard coded file path.
-//        inFilePrms.addAll(extractInFileParams());
-        inFilePrms.add("file://home/airavata/test/hpcinput-localhost-uslims3_cauma3d-00950.tar");
+        inFilePrms.addAll(extractInFileParams());
+//        inFilePrms.add("file://home/airavata/test/hpcinput-localhost-uslims3_cauma3d-00950.tar");
         for (String uri : inFilePrms) {
             String fileName = new File(uri).getName();
             if (uri.startsWith("file")) {
@@ -256,7 +257,7 @@ public class DataTransferrer {
 		return tmpOutputDir;
 	}
 
-    public void downloadRemoteFiles() throws GFacException {
+    public List<OutputDataObjectType> downloadRemoteFiles() throws GFacException {
 
         if(log.isDebugEnabled()) {
             log.debug("Download location is:" + gatewayDownloadLocation);
@@ -269,12 +270,12 @@ public class DataTransferrer {
                     continue;
                 }
                 if(output.getType().equals(DataType.STDOUT)) {
-                    output.setValue(processContext.getStdoutLocation());
+                    output.setValue(stdoutLocation);
                     resultantOutputsLst.add(output);
                 } else if(output.getType().equals(DataType.STDERR)) {
-                    output.setValue(processContext.getStderrLocation());
+                    output.setValue(stderrLocation);
                     resultantOutputsLst.add(output);
-                } else if (output.getType().equals(DataType.STRING)) {
+                } else if (output.getType().equals(DataType.URI)) {
                     String value = null;
                     if (!output.getLocation().isEmpty()) {
                         value = output.getLocation() + File.separator + output.getValue();
@@ -305,6 +306,7 @@ public class DataTransferrer {
         }
 
         downloadStdOuts();
+        return resultantOutputsLst;
 
     }
 


[2/2] airavata git commit: Merge branch 'develop'

Posted by sh...@apache.org.
Merge branch 'develop'


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/eeb10167
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/eeb10167
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/eeb10167

Branch: refs/heads/master
Commit: eeb101679bc78eb902c11bcc4632db5e96264cc4
Parents: ab262c7 7bde79b
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Mar 25 18:33:22 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Mar 25 18:33:22 2016 -0400

----------------------------------------------------------------------
 .../gfac/impl/task/BESJobSubmissionTask.java    | 137 +++++++++++++++++--
 .../impl/task/utils/bes/DataTransferrer.java    |  14 +-
 2 files changed, 134 insertions(+), 17 deletions(-)
----------------------------------------------------------------------