You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/03/25 23:33:31 UTC
[1/2] airavata git commit: Stage in and out jureca inputs and output
from storage resoruce
Repository: airavata
Updated Branches:
refs/heads/master ab262c706 -> eeb101679
Stage in and out jureca inputs and output from storage resoruce
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7bde79be
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7bde79be
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7bde79be
Branch: refs/heads/master
Commit: 7bde79be3430afba12b2fc653a15b05afa568356
Parents: 5f484b9
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Mar 25 18:33:07 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Mar 25 18:33:07 2016 -0400
----------------------------------------------------------------------
.../gfac/impl/task/BESJobSubmissionTask.java | 137 +++++++++++++++++--
.../impl/task/utils/bes/DataTransferrer.java | 14 +-
2 files changed, 134 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/7bde79be/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
index 678924f..067bbe8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
@@ -21,23 +21,37 @@
package org.apache.airavata.gfac.impl.task;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
import de.fzj.unicore.bes.client.ActivityClient;
import de.fzj.unicore.bes.client.FactoryClient;
import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
import de.fzj.unicore.uas.client.StorageClient;
import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
import org.apache.airavata.gfac.impl.task.utils.bes.*;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.experiment.UserConfigurationDataModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
@@ -45,7 +59,6 @@ import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskTypes;
-import org.apache.airavata.registry.core.experiment.catalog.model.UserConfigurationData;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.airavata.registry.cpi.RegistryException;
@@ -56,7 +69,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3.x2005.x08.addressing.EndpointReferenceType;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Calendar;
+import java.util.List;
import java.util.Map;
public class BESJobSubmissionTask implements JobSubmissionTask {
@@ -64,6 +82,12 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
private DefaultClientConfiguration secProperties;
private String jobId;
+ private String hostName;
+ private String userName;
+ private String inputPath;
+ private int DEFAULT_SSH_PORT = 22;
+ private AuthenticationInfo authenticationInfo;
+
@Override
public JobStatus cancel(TaskContext taskcontext) throws TaskException {
return null;
@@ -77,12 +101,12 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
public TaskStatus execute(TaskContext taskContext) {
TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
StorageClient sc = null;
+ ProcessContext processContext = taskContext.getParentProcessContext();
// FIXME - use original output dir
- taskContext.getParentProcessContext().setOutputDir("");
-
+ setInputOutputLocations(processContext);
try {
if (secProperties == null) {
- secProperties = getSecurityConfig(taskContext.getParentProcessContext());
+ secProperties = getSecurityConfig(processContext);
} // try secProperties = secProperties.clone() if we can't use already initialized ClientConfigurations.
} catch (GFacException e) {
String msg = "Unicorn security context initialization error";
@@ -93,7 +117,6 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
}
try {
- ProcessContext processContext = taskContext.getParentProcessContext();
JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
JobSubmissionInterface jobSubmissionInterface = GFacUtils.getPreferredJobSubmissionInterface(processContext);
String factoryUrl = null;
@@ -117,6 +140,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
+ // copy files to local
+ copyInputFilesToLocal(taskContext);
// upload files if any
DataTransferrer dt = new DataTransferrer(processContext, sc);
dt.uploadLocalFiles();
@@ -162,6 +187,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
// now use the activity working directory property
dt.setStorageClient(activityClient.getUspaceClient());
+ List<OutputDataObjectType> copyOutput = null;
+
if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
String error = activityStatus.getFault().getFaultcode()
.getLocalPart()
@@ -177,7 +204,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
//What if job is failed before execution and there are not stdouts generated yet?
log.debug("Downloading any standard output and error files, if they were produced.");
- dt.downloadRemoteFiles();
+ copyOutput = dt.downloadRemoteFiles();
} else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
JobState applicationJobStatus = JobState.CANCELED;
@@ -200,22 +227,110 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
// } else {
// dt.downloadStdOuts();
// }
- dt.downloadRemoteFiles();
+ copyOutput = dt.downloadRemoteFiles();
}
-
- dt.publishFinalOutputs();
+ if (copyOutput != null) {
+ copyOutputFilesToStorage(taskContext, copyOutput);
+ for (OutputDataObjectType outputDataObjectType : copyOutput) {
+ GFacUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue());
+ }
+ }
+// dt.publishFinalOutputs();
taskStatus.setState(TaskState.COMPLETED);
} catch (AppCatalogException e) {
- log.error("Error while retrieving UNICORE job submission..");
+ log.error("Error while retrieving UNICORE job submission.." , e);
taskStatus.setState(TaskState.FAILED);
} catch (Exception e) {
- log.error("Cannot create storage..", e);
+ log.error("BES task failed... ", e);
taskStatus.setState(TaskState.FAILED);
}
return taskStatus;
}
+ private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws GFacException {
+ ProcessContext pc = taskContext.getParentProcessContext();
+ String remoteFilePath = null, fileName = null, localFilePath = null;
+ try {
+ authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+ ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+ Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+
+ for (OutputDataObjectType output : copyOutput) {
+ switch (output.getType()) {
+ case STDERR: case STDOUT: case STRING: case URI:
+ localFilePath = output.getValue();
+ if (localFilePath.indexOf("://") != -1) {
+ localFilePath = localFilePath.substring(localFilePath.indexOf("://") + 2, localFilePath.length());
+ }
+ fileName = localFilePath.substring(localFilePath.lastIndexOf("/") + 1);
+ URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+ remoteFilePath = destinationURI.getPath();
+// = pc.getInputDir() + File.separator + fileName;
+ SSHUtils.scpTo(localFilePath, remoteFilePath, sshSession);
+ output.setValue("file:/" + destinationURI.getPath());
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException e) {
+ log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e);
+ throw new GFacException("Error while scp output files to remote storage file location", e);
+ }
+ }
+
+ private void copyInputFilesToLocal(TaskContext taskContext) throws GFacException {
+ ProcessContext pc = taskContext.getParentProcessContext();
+ StorageResourceDescription storageResource = pc.getStorageResource();
+ StoragePreference storagePreference = pc.getStoragePreference();
+
+ if (storageResource != null) {
+ hostName = storageResource.getHostName();
+ } else {
+ throw new GFacException("Storage Resource is null");
+ }
+
+ if (storagePreference != null) {
+ userName = storagePreference.getLoginUserName();
+ inputPath = storagePreference.getFileSystemRootLocation();
+ inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+ } else {
+ throw new GFacException("Storage Preference is null");
+ }
+
+ String remoteFilePath = null, fileName = null, localFilePath = null;
+ URI remoteFileURI = null;
+ try {
+ authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+ ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+ Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+
+ List<InputDataObjectType> processInputs = pc.getProcessModel().getProcessInputs();
+ for (InputDataObjectType input : processInputs) {
+ if (input.getType() == DataType.URI) {
+ remoteFileURI = new URI(input.getValue());
+ remoteFilePath = remoteFileURI.getPath();
+ fileName = remoteFilePath.substring(remoteFilePath.lastIndexOf("/") + 1);
+ localFilePath = pc.getInputDir() + File.separator + fileName;
+ SSHUtils.scpFrom(remoteFilePath, localFilePath, sshSession);
+ input.setValue("file:/" + localFilePath);
+ }
+ }
+ } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException e) {
+ log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e);
+ throw new GFacException("Error while scp input files to local file location", e);
+ }
+ }
+
+ private void setInputOutputLocations(ProcessContext processContext) {
+ String localPath = System.getProperty("java.io.tmpdir") + File.separator + processContext.getProcessId();
+ new File(localPath).mkdir();
+
+ processContext.setInputDir(localPath);
+ processContext.setOutputDir(localPath);
+ }
+
private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws GFacException {
DefaultClientConfiguration clientConfig = null;
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/7bde79be/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
index b4bf9ed..08fa551 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
@@ -24,6 +24,7 @@ package org.apache.airavata.gfac.impl.task.utils.bes;
import de.fzj.unicore.uas.client.StorageClient;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
@@ -105,8 +106,8 @@ public class DataTransferrer {
public void uploadLocalFiles() throws GFacException {
List<String> inFilePrms = new ArrayList<>();
// FIXME - remove hard coded file path.
-// inFilePrms.addAll(extractInFileParams());
- inFilePrms.add("file://home/airavata/test/hpcinput-localhost-uslims3_cauma3d-00950.tar");
+ inFilePrms.addAll(extractInFileParams());
+// inFilePrms.add("file://home/airavata/test/hpcinput-localhost-uslims3_cauma3d-00950.tar");
for (String uri : inFilePrms) {
String fileName = new File(uri).getName();
if (uri.startsWith("file")) {
@@ -256,7 +257,7 @@ public class DataTransferrer {
return tmpOutputDir;
}
- public void downloadRemoteFiles() throws GFacException {
+ public List<OutputDataObjectType> downloadRemoteFiles() throws GFacException {
if(log.isDebugEnabled()) {
log.debug("Download location is:" + gatewayDownloadLocation);
@@ -269,12 +270,12 @@ public class DataTransferrer {
continue;
}
if(output.getType().equals(DataType.STDOUT)) {
- output.setValue(processContext.getStdoutLocation());
+ output.setValue(stdoutLocation);
resultantOutputsLst.add(output);
} else if(output.getType().equals(DataType.STDERR)) {
- output.setValue(processContext.getStderrLocation());
+ output.setValue(stderrLocation);
resultantOutputsLst.add(output);
- } else if (output.getType().equals(DataType.STRING)) {
+ } else if (output.getType().equals(DataType.URI)) {
String value = null;
if (!output.getLocation().isEmpty()) {
value = output.getLocation() + File.separator + output.getValue();
@@ -305,6 +306,7 @@ public class DataTransferrer {
}
downloadStdOuts();
+ return resultantOutputsLst;
}
[2/2] airavata git commit: Merge branch 'develop'
Posted by sh...@apache.org.
Merge branch 'develop'
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/eeb10167
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/eeb10167
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/eeb10167
Branch: refs/heads/master
Commit: eeb101679bc78eb902c11bcc4632db5e96264cc4
Parents: ab262c7 7bde79b
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Mar 25 18:33:22 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Mar 25 18:33:22 2016 -0400
----------------------------------------------------------------------
.../gfac/impl/task/BESJobSubmissionTask.java | 137 +++++++++++++++++--
.../impl/task/utils/bes/DataTransferrer.java | 14 +-
2 files changed, 134 insertions(+), 17 deletions(-)
----------------------------------------------------------------------