You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ms...@apache.org on 2015/06/23 15:10:38 UTC
airavata git commit: Trying to push outputs to registry so that PGA
can show the outputs, still in progress..
Repository: airavata
Updated Branches:
refs/heads/airavata-0.15-release-branch f18e34307 -> 1951bfd10
Trying to push outputs to registry so that PGA can show the outputs,
still in progress..
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1951bfd1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1951bfd1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1951bfd1
Branch: refs/heads/airavata-0.15-release-branch
Commit: 1951bfd107bb0811516415cddb720183ca6c92a4
Parents: f18e343
Author: msmemon <sh...@gmail.com>
Authored: Tue Jun 23 15:10:27 2015 +0200
Committer: msmemon <sh...@gmail.com>
Committed: Tue Jun 23 15:10:27 2015 +0200
----------------------------------------------------------------------
.../gfac/bes/provider/impl/BESProvider.java | 6 +-
.../gfac/bes/utils/DataTransferrer.java | 174 +++++++++----------
2 files changed, 85 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/1951bfd1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index 3ed08cb..69e4915 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -49,6 +49,7 @@ import org.apache.airavata.model.messaging.event.JobIdentifier;
import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.xmlbeans.XmlCursor;
import org.bouncycastle.asn1.x500.style.BCStyle;
import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration;
@@ -141,8 +142,6 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
-
-
// upload files if any
DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
dt.uploadLocalFiles();
@@ -228,7 +227,8 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
} catch (Exception e) {
log.error("Cannot create storage..");
throw new GFacProviderException("Cannot create storage..", e);
- }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/1951bfd1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
index 2dbf637..7de0a11 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
@@ -26,25 +26,21 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.DataType;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,11 +54,42 @@ public class DataTransferrer {
private StorageClient storageClient;
+ private List<OutputDataObjectType> resultantOutputsLst;
+
+ private String downloadLocation, stdoutLocation, stderrLocation;
+
public DataTransferrer(JobExecutionContext jobContext, StorageClient storageClient) {
this.jobContext = jobContext;
this.storageClient = storageClient;
+ resultantOutputsLst = new ArrayList<OutputDataObjectType>();
+ initStdoutsLocation();
}
+ private void initStdoutsLocation() {
+
+ downloadLocation = getDownloadLocation();
+
+ String stdout = jobContext.getStandardOutput();
+ String stderr = jobContext.getStandardError();
+ if(stdout != null) {
+ stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+ }
+
+ if(stderr != null) {
+ stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+ }
+
+ String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+ : stdout;
+ String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+ : stderr;
+
+ stdoutLocation = downloadLocation+File.separator+stdoutFileName;
+
+ stderrLocation = downloadLocation+File.separator+stderrFileName;
+
+
+ }
public void uploadLocalFiles() throws GFacProviderException {
List<String> inFilePrms = extractInFileParams();
@@ -86,43 +113,13 @@ public class DataTransferrer {
}
+
/**
* This method will download all the remote files specified in the output
* context of a job.
* */
public void downloadRemoteFiles() throws GFacProviderException {
- String downloadLocation = getDownloadLocation();
-
- File file = new File(downloadLocation);
- if(!file.exists()){
- file.mkdirs();
- }
-
-// Map<String, Object> output = jobContext.getOutMessageContext().getParameters();
-// Set<String> keys = output.keySet();
-//
-// for (String outPrm : keys) {
-// OutputDataObjectType actualParameter = (OutputDataObjectType) output.get(outPrm);
-// if (DataType.STDERR == actualParameter.getType() ||
-// DataType.STDOUT == actualParameter.getType() ||
-// DataType.URI == actualParameter.getType()) {
-// continue;
-// }
-//
-// String value = actualParameter.getValue();
-// FileDownloader fileDownloader = new FileDownloader(value,downloadLocation, Mode.overwrite);
-// try {
-// fileDownloader.perform(storageClient);
-// String outputPath = downloadLocation + File.separator + value.substring(value.lastIndexOf('/')+1);
-// actualParameter.setValue(outputPath);
-// actualParameter.setType(DataType.URI);
-// jobContext.addOutputFile(outputPath);
-// } catch (Exception e) {
-// throw new GFacProviderException(e.getLocalizedMessage(),e);
-// }
-// }
-
if(log.isDebugEnabled()) {
log.debug("Download location is:"+downloadLocation);
}
@@ -136,48 +133,36 @@ public class DataTransferrer {
if(output.getType().equals(DataType.STRING)) {
String value = output.getValue();
- FileDownloader fileDownloader = new FileDownloader(value,downloadLocation, Mode.overwrite);
+ String outputPath = downloadLocation + File.separator + value;
+ FileDownloader fileDownloader = new FileDownloader(value,outputPath, Mode.overwrite);
try {
fileDownloader.perform(storageClient);
- String outputPath = downloadLocation + File.separator + value;
- jobContext.addOutputFile(outputPath);
} catch (Exception e) {
- log.error("Cannot download remote files..");
+ log.error("Error downloading remote files..");
throw new GFacProviderException(e.getLocalizedMessage(),e);
}
+ resultantOutputsLst.add(output);
+ jobContext.addOutputFile(outputPath);
+ }
+
+ if(output.getType().equals(DataType.STDOUT)) {
+ resultantOutputsLst.add(output);
+ }
+
+ if(output.getType().equals(DataType.STDERR)) {
+ resultantOutputsLst.add(output);
}
}
}
-
downloadStdOuts();
}
-
public void downloadStdOuts() throws GFacProviderException{
- String downloadLocation = getDownloadLocation();
- File file = new File(downloadLocation);
- if(!file.exists()){
- file.mkdirs();
- }
- String stdout = jobContext.getStandardOutput();
- String stderr = jobContext.getStandardError();
- if(stdout != null) {
- stdout = stdout.substring(stdout.lastIndexOf('/')+1);
- }
+ String stdoutFileName = new File(stdoutLocation).getName();
- if(stderr != null) {
- stderr = stderr.substring(stderr.lastIndexOf('/')+1);
- }
+ String stderrFileName = new File(stderrLocation).getName();
- String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
- : stdout;
- String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
- : stderr;
-
- ApplicationDeploymentDescription application = jobContext.getApplicationContext().getApplicationDeploymentDescription();
-
- String stdoutLocation = downloadLocation+File.separator+stdoutFileName;
FileDownloader f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite);
try {
f1.perform(storageClient);
@@ -185,47 +170,45 @@ public class DataTransferrer {
String stdoutput = readFile(stdoutLocation);
jobContext.addOutputFile(stdoutLocation);
jobContext.setStandardOutput(stdoutLocation);
- log.info("Stdout downloaded to -> "+stdoutLocation);
+ log.info(stdoutFileName + " -> "+stdoutLocation);
if(UASDataStagingProcessor.isUnicoreEndpoint(jobContext)) {
String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
String scriptCodeLocation = downloadLocation+File.separator+scriptExitCodeFName;
f1.setFrom(scriptExitCodeFName);
f1.setTo(scriptCodeLocation);
f1.perform(storageClient);
- log.info("UNICORE_SCRIPT_EXIT_CODE downloaded to "+scriptCodeLocation);
+ jobContext.addOutputFile(scriptCodeLocation);
+ log.info("UNICORE_SCRIPT_EXIT_CODE -> "+scriptCodeLocation);
}
- String stderrLocation = downloadLocation+File.separator+stderrFileName;
+
f1.setFrom(stderrFileName);
f1.setTo(stderrLocation);
f1.perform(storageClient);
String stderror = readFile(stderrLocation);
jobContext.addOutputFile(stderrLocation);
jobContext.setStandardError(stderrLocation);
- log.info("Stderr downloaded to -> "+stderrLocation);
+ log.info(stderrFileName + " -> " + stderrLocation);
} catch (Exception e) {
-
throw new GFacProviderException(e.getLocalizedMessage(),e);
}
+ publishFinalOutputs();
}
- public List<String> extractOutParams(JobExecutionContext context) {
- List<String> outPrmsList = new ArrayList<String>();
- List<OutputDataObjectType> applicationOutputs = jobContext.getTaskData().getApplicationOutputs();
- if (applicationOutputs != null && !applicationOutputs.isEmpty()){
- for (OutputDataObjectType output : applicationOutputs){
- if(output.getType().equals(DataType.STRING)) {
- outPrmsList.add(output.getValue());
- }
- else if(output.getType().equals(DataType.FLOAT) || output.getType().equals(DataType.INTEGER)) {
- outPrmsList.add(String.valueOf(output.getValue()));
-
- }
- }
- }
- return outPrmsList;
+ protected void publishFinalOutputs() throws GFacProviderException {
+ try {
+ if(!resultantOutputsLst.isEmpty()) {
+ Registry registry = jobContext.getRegistry();
+ registry.add(ChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst, jobContext.getExperimentID());
+ }
+ } catch (RegistryException e) {
+ throw new GFacProviderException("Cannot publish outputs to the registry.");
+ }
+
+
}
+
public List<String> extractInFileParams() {
List<String> filePrmsList = new ArrayList<String>();
List<InputDataObjectType> applicationInputs = jobContext.getTaskData().getApplicationInputs();
@@ -274,11 +257,6 @@ public class DataTransferrer {
else {
- try {
- URI u = new URI(outputDataDir);
- } catch (URISyntaxException e) {
- return getTempPath(jobContext.getExperimentID());
- }
// in case of remote locations use the tmp location
if (outputDataDir.startsWith("scp:") ||
outputDataDir.startsWith("ftp:") ||
@@ -289,12 +267,24 @@ public class DataTransferrer {
outputDataDir = getTempPath(jobContext.getExperimentID());
} else {
- outputDataDir = taskData.getAdvancedOutputDataHandling()
- .getOutputDataDir();
+ try {
+ URI u = new URI(outputDataDir);
+ outputDataDir = u.getPath();
+ } catch (URISyntaxException e) {
+ outputDataDir = getTempPath(jobContext.getExperimentID());
+ }
+
+
}
}
}
+
+ File file = new File(outputDataDir);
+ if(!file.exists()){
+ file.mkdirs();
+ }
+
return outputDataDir;
}