You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ms...@apache.org on 2015/06/30 15:50:37 UTC
airavata git commit: some data staging modifications to seamlessly
download and display job output files
Repository: airavata
Updated Branches:
refs/heads/airavata-0.15-release-branch 2bab70875 -> a080ff7b2
some data staging modifications to seamlessly download and display job
output files
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a080ff7b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a080ff7b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a080ff7b
Branch: refs/heads/airavata-0.15-release-branch
Commit: a080ff7b2fcada5497a84a9effdff19570c24945
Parents: 2bab708
Author: msmemon <sh...@gmail.com>
Authored: Tue Jun 30 15:50:26 2015 +0200
Committer: msmemon <sh...@gmail.com>
Committed: Tue Jun 30 15:50:26 2015 +0200
----------------------------------------------------------------------
.../gfac/bes/provider/impl/BESProvider.java | 7 +-
.../bes/security/UNICORESecurityContext.java | 14 ++-
.../gfac/bes/utils/DataTransferrer.java | 105 +++++++++++--------
.../gfac/bes/utils/UASDataStagingProcessor.java | 23 +++-
4 files changed, 93 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a080ff7b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index 69e4915..97cf2d3 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -109,7 +109,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
log.debug("Security properties initialized.");
}
}
-
+
public void execute(JobExecutionContext jobExecutionContext)
throws GFacProviderException, GFacException {
@@ -182,6 +182,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
ActivityClient activityClient;
activityClient = new ActivityClient(activityEpr, secProperties);
+ // now use the activity working directory property
dt.setStorageClient(activityClient.getUspaceClient());
if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
@@ -191,7 +192,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
+ activityStatus.getFault().getFaultstring()
+ "\n EXITCODE: " + activityStatus.getExitCode();
log.info(error);
-
+
JobState applicationJobStatus = JobState.FAILED;
sendNotification(jobExecutionContext,applicationJobStatus);
GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
@@ -221,6 +222,8 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
dt.downloadStdOuts();
}
}
+
+ dt.publishFinalOutputs();
} catch (AppCatalogException e) {
log.error("Error while retrieving UNICORE job submission..");
throw new GFacProviderException("Error while retrieving UNICORE job submission..", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/a080ff7b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
index e82beeb..20c493f 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
@@ -39,6 +39,7 @@ import org.bouncycastle.asn1.x500.style.BCStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import de.fzj.unicore.uas.security.ProxyCertOutHandler;
import eu.emi.security.authn.x509.X509Credential;
import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
import eu.emi.security.authn.x509.impl.X500NameUtils;
@@ -73,11 +74,7 @@ public class UNICORESecurityContext extends X509SecurityContext {
catch (Exception e) {
throw new GFacException(e.getMessage(), e);
}
- secProperties.getETDSettings().setExtendTrustDelegation(true);
if(enableMessageLogging) secProperties.setMessageLogging(true);
-// secProperties.setMessageLogging(true);
-// secProperties.setDoSignMessage(true);
- secProperties.getETDSettings().setIssuerCertificateChain(secProperties.getCredential().getCertificateChain());
return secProperties;
}
@@ -180,18 +177,19 @@ public class UNICORESecurityContext extends X509SecurityContext {
p.setProperty("http.connection.timeout", "5000");
p.setProperty("http.socket.timeout", "5000");
- secProperties.setExtraSettings(p);
-
if (outHandlers == null) {
outHandlerLst = new HashSet<String>();
} else {
outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
}
- outHandlerLst.add("de.fzj.unicore.uas.security.ProxyCertOutHandler");
-
+ outHandlerLst.add(ProxyCertOutHandler.class.getName());
+
secProperties.setOutHandlerClassNames(outHandlerLst
.toArray(new String[outHandlerLst.size()]));
+
+ secProperties.getETDSettings().setExtendTrustDelegation(true);
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a080ff7b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
index 7de0a11..0c6b7a2 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
@@ -46,17 +46,21 @@ import org.slf4j.LoggerFactory;
import de.fzj.unicore.uas.client.StorageClient;
-
+/**
+ * Data movement utility class for transferring files before and after the job execution phase.
+ *
+ * */
public class DataTransferrer {
- protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
- private JobExecutionContext jobContext;
+ protected JobExecutionContext jobContext;
- private StorageClient storageClient;
+ protected StorageClient storageClient;
- private List<OutputDataObjectType> resultantOutputsLst;
+ protected List<OutputDataObjectType> resultantOutputsLst;
- private String downloadLocation, stdoutLocation, stderrLocation;
+ protected String gatewayDownloadLocation, stdoutLocation, stderrLocation;
public DataTransferrer(JobExecutionContext jobContext, StorageClient storageClient) {
this.jobContext = jobContext;
@@ -67,7 +71,7 @@ public class DataTransferrer {
private void initStdoutsLocation() {
- downloadLocation = getDownloadLocation();
+ gatewayDownloadLocation = getDownloadLocation();
String stdout = jobContext.getStandardOutput();
String stderr = jobContext.getStandardError();
@@ -84,10 +88,14 @@ public class DataTransferrer {
String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
: stderr;
- stdoutLocation = downloadLocation+File.separator+stdoutFileName;
+ stdoutLocation = gatewayDownloadLocation+File.separator+stdoutFileName;
- stderrLocation = downloadLocation+File.separator+stderrFileName;
-
+ stderrLocation = gatewayDownloadLocation+File.separator+stderrFileName;
+
+ jobContext.addOutputFile(stdoutLocation);
+ jobContext.setStandardOutput(stdoutLocation);
+ jobContext.addOutputFile(stderrLocation);
+ jobContext.setStandardError(stderrLocation);
}
@@ -121,7 +129,7 @@ public class DataTransferrer {
public void downloadRemoteFiles() throws GFacProviderException {
if(log.isDebugEnabled()) {
- log.debug("Download location is:"+downloadLocation);
+ log.debug("Download location is:"+gatewayDownloadLocation);
}
List<OutputDataObjectType> applicationOutputs = jobContext.getTaskData().getApplicationOutputs();
@@ -130,31 +138,37 @@ public class DataTransferrer {
if("".equals(output.getValue()) || output.getValue() == null) {
continue;
}
-
- if(output.getType().equals(DataType.STRING)) {
+ if(output.getType().equals(DataType.STDOUT)) {
+ output.setValue(jobContext.getStandardOutput());
+ resultantOutputsLst.add(output);
+ }
+
+ else if(output.getType().equals(DataType.STDERR)) {
+ output.setValue(jobContext.getStandardError());
+ resultantOutputsLst.add(output);
+ }
+ else if(output.getType().equals(DataType.STRING)) {
String value = output.getValue();
- String outputPath = downloadLocation + File.separator + value;
+ String outputPath = gatewayDownloadLocation + File.separator + value;
FileDownloader fileDownloader = new FileDownloader(value,outputPath, Mode.overwrite);
try {
fileDownloader.perform(storageClient);
+ output.setType(DataType.URI);
+ output.setValue(outputPath);
+ jobContext.addOutputFile(outputPath);
+ resultantOutputsLst.add(output);
} catch (Exception e) {
- log.error("Error downloading remote files..");
+ log.error("Error downloading "+value+" from job working directory. ");
throw new GFacProviderException(e.getLocalizedMessage(),e);
}
- resultantOutputsLst.add(output);
- jobContext.addOutputFile(outputPath);
}
- if(output.getType().equals(DataType.STDOUT)) {
- resultantOutputsLst.add(output);
- }
-
- if(output.getType().equals(DataType.STDERR)) {
- resultantOutputsLst.add(output);
- }
}
+
}
- downloadStdOuts();
+
+ downloadStdOuts();
+
}
public void downloadStdOuts() throws GFacProviderException{
@@ -163,41 +177,48 @@ public class DataTransferrer {
String stderrFileName = new File(stderrLocation).getName();
- FileDownloader f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite);
+ FileDownloader f1 = null;
try {
- f1.perform(storageClient);
log.info("Downloading stdout and stderr..");
- String stdoutput = readFile(stdoutLocation);
- jobContext.addOutputFile(stdoutLocation);
- jobContext.setStandardOutput(stdoutLocation);
log.info(stdoutFileName + " -> "+stdoutLocation);
+
+ f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite);
+ f1.perform(storageClient);
+ String stdoutput = readFile(stdoutLocation);
+
+ log.info(stderrFileName + " -> " + stderrLocation);
+ f1.setFrom(stderrFileName);
+ f1.setTo(stderrLocation);
+ f1.perform(storageClient);
+ String stderror = readFile(stderrLocation);
+
if(UASDataStagingProcessor.isUnicoreEndpoint(jobContext)) {
String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
- String scriptCodeLocation = downloadLocation+File.separator+scriptExitCodeFName;
+ String scriptCodeLocation = gatewayDownloadLocation+File.separator+scriptExitCodeFName;
f1.setFrom(scriptExitCodeFName);
f1.setTo(scriptCodeLocation);
f1.perform(storageClient);
jobContext.addOutputFile(scriptCodeLocation);
log.info("UNICORE_SCRIPT_EXIT_CODE -> "+scriptCodeLocation);
+ log.info("EXIT CODE: "+ readFile(scriptCodeLocation));
}
-
- f1.setFrom(stderrFileName);
- f1.setTo(stderrLocation);
- f1.perform(storageClient);
- String stderror = readFile(stderrLocation);
- jobContext.addOutputFile(stderrLocation);
- jobContext.setStandardError(stderrLocation);
- log.info(stderrFileName + " -> " + stderrLocation);
} catch (Exception e) {
throw new GFacProviderException(e.getLocalizedMessage(),e);
}
- publishFinalOutputs();
}
- protected void publishFinalOutputs() throws GFacProviderException {
+ /**
+ * This method should be called once all the output files are successfully
+ * transferred from the remote Unicore endpoint.
+ * Its access is made public to give clients a room for
+ * graceful invocation after the required outputs are downloaded.
+ *
+ * */
+ public void publishFinalOutputs() throws GFacProviderException {
try {
- if(!resultantOutputsLst.isEmpty()) {
+ if(!resultantOutputsLst.isEmpty()) {
+ log.debug("Publishing the list of outputs to the registry instance..");
Registry registry = jobContext.getRegistry();
registry.add(ChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst, jobContext.getExperimentID());
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a080ff7b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java
index 5de9593..0b6d62c 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java
@@ -23,15 +23,21 @@ package org.apache.airavata.gfac.bes.utils;
import java.io.File;
import java.util.List;
+
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.model.appcatalog.appinterface.DataType;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class UASDataStagingProcessor {
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+
public static void generateDataStagingElements(JobDefinitionType value, JobExecutionContext context, String smsUrl) throws Exception{
smsUrl = "BFT:"+smsUrl;
@@ -44,10 +50,11 @@ public class UASDataStagingProcessor {
}
}
- private static void createInURISMSElement(JobDefinitionType value, String smsUrl, String uri)
+ private static void createInURISMSElement(JobDefinitionType value, String smsUrl, String uri, boolean useSMS)
throws Exception {
String fileName = new File(uri).getName();
- if (uri.startsWith("file")) {
+
+ if (useSMS && uri.startsWith("file:")) {
uri = smsUrl+"#/"+fileName;
}
@@ -136,9 +143,17 @@ public class UASDataStagingProcessor {
continue;
}
if(input.getType().equals(DataType.URI)){
- createInURISMSElement(value, smsUrl, input.getValue());
+ createInURISMSElement(value, smsUrl, input.getValue(), false);
+ }
+ else if(input.getType().equals(DataType.STRING) && input.isDataStaged()){
+ System.out.println("INPUT DATA VALUE-SBZ: "+input.getValue());
+ System.out.println("INPUT DATA flag-SBZ: "+input.isDataStaged());
+
+ createInURISMSElement(value, smsUrl, input.getValue(), false);
}
- else if(input.getType().equals(DataType.STRING)){
+ else if(input.getType().equals(DataType.STRING) && !input.isDataStaged()){
+ System.out.println("INPUT DATA value for arg-SBZ: "+input.getValue());
+ System.out.println("INPUT DATA flag for arg-SBZ: "+input.isDataStaged());
ApplicationProcessor.addApplicationArgument(value, context, input.getValue());
}
else if (input.getType().equals(DataType.FLOAT) || input.getType().equals(DataType.INTEGER)){