You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/11/06 23:12:12 UTC
[airavata] branch develop updated: Supporting properties for parsers
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new eccf3bd Supporting properties for parsers
eccf3bd is described below
commit eccf3bdc65f0b9407cd3eda2f8b376b42a9e9aff
Author: Dimuthu Wannipurage <di...@datasprouts.com>
AuthorDate: Tue Nov 6 18:12:03 2018 -0500
Supporting properties for parsers
---
.../helix/impl/task/parsing/DataParsingTask.java | 75 ++++++++++++++--------
.../helix/impl/workflow/ParserWorkflowManager.java | 74 ++++++++++++++-------
2 files changed, 99 insertions(+), 50 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
index 03f9a56..90d499a 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
@@ -65,8 +65,11 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Implementation of the data parsing task.
@@ -111,54 +114,63 @@ public class DataParsingTask extends AbstractTask {
logger.info("Downloading input files to local input directory");
+ Map<String, String> properties = new HashMap<>();
for (ParserInput parserInput : parser.getInputFiles()) {
Optional<ParsingTaskInput> filteredInputOptional = parsingTaskInputs.getInputs().stream().filter(inp -> parserInput.getId().equals(inp.getId())).findFirst();
if (filteredInputOptional.isPresent()) {
ParsingTaskInput parsingTaskInput = filteredInputOptional.get();
- String inputDataProductUri = parsingTaskInput.getValue() != null ? parsingTaskInput.getValue() : getContextVariable(parsingTaskInput.getContextVariableName());
- if (inputDataProductUri == null || inputDataProductUri.isEmpty()) {
- logger.error("Data product uri could not be null or empty for input " + parsingTaskInput.getId()
- + " with name " + parserInput.getName());
- throw new TaskOnFailException("Data product uri could not be null or empty for input "
- + parsingTaskInput.getId() + " with name " + parserInput.getName(), true, null);
- }
- DataProductModel inputDataProduct = getRegistryServiceClient().getDataProduct(inputDataProductUri);
- List<DataReplicaLocationModel> replicaLocations = inputDataProduct.getReplicaLocations();
+ String inputVal = parsingTaskInput.getValue() != null ? parsingTaskInput.getValue() : getContextVariable(parsingTaskInput.getContextVariableName());
+
+ if ("PROPERTY".equals(parsingTaskInput.getType())) {
+ properties.put(parsingTaskInput.getContextVariableName(), inputVal);
+ } else if ("FILE".equals(parsingTaskInput.getType())) {
- boolean downloadPassed = false;
+ String inputDataProductUri = inputVal;
+
+ if (inputDataProductUri == null || inputDataProductUri.isEmpty()) {
+ logger.error("Data product uri could not be null or empty for input " + parsingTaskInput.getId()
+ + " with name " + parserInput.getName());
+ throw new TaskOnFailException("Data product uri could not be null or empty for input "
+ + parsingTaskInput.getId() + " with name " + parserInput.getName(), true, null);
+ }
+ DataProductModel inputDataProduct = getRegistryServiceClient().getDataProduct(inputDataProductUri);
+ List<DataReplicaLocationModel> replicaLocations = inputDataProduct.getReplicaLocations();
- for (DataReplicaLocationModel replicaLocationModel : replicaLocations) {
- String storageResourceId = replicaLocationModel.getStorageResourceId();
- String remoteFilePath = new URI(replicaLocationModel.getFilePath()).getPath();
- String localFilePath = localInputDir + (localInputDir.endsWith(File.separator)? "" : File.separator)
- + parserInput.getName();
+ boolean downloadPassed = false;
- downloadPassed = downloadFileFromStorageResource(storageResourceId, remoteFilePath, localFilePath, helper.getAdaptorSupport());
+ for (DataReplicaLocationModel replicaLocationModel : replicaLocations) {
+ String storageResourceId = replicaLocationModel.getStorageResourceId();
+ String remoteFilePath = new URI(replicaLocationModel.getFilePath()).getPath();
+ String localFilePath = localInputDir + (localInputDir.endsWith(File.separator) ? "" : File.separator)
+ + parserInput.getName();
- if (downloadPassed) {
- break;
+ downloadPassed = downloadFileFromStorageResource(storageResourceId, remoteFilePath, localFilePath, helper.getAdaptorSupport());
+
+ if (downloadPassed) {
+ break;
+ }
}
- }
- if (!downloadPassed) {
- logger.error("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri);
- throw new TaskOnFailException("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri, true, null);
+ if (!downloadPassed) {
+ logger.error("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri);
+ throw new TaskOnFailException("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri, true, null);
+ }
}
} else {
if (parserInput.isRequiredInput()) {
- logger.error("File download info with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available");
- throw new TaskOnFailException("File download info with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available", true, null);
+ logger.error("Parser input with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available");
+ throw new TaskOnFailException("Parser input with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available", true, null);
} else {
- logger.warn("File download info with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available. But it is not required");
+ logger.warn("Parser input with id with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available. But it is not required");
}
}
}
logger.info("Running container with id " + containerId + " local input dir " + localInputDir + " local output dir " + localOutDir);
- runContainer(parser, containerId, localInputDir, localOutDir);
+ runContainer(parser, containerId, localInputDir, localOutDir, properties);
for (ParserOutput parserOutput : parser.getOutputFiles()) {
@@ -211,14 +223,21 @@ public class DataParsingTask extends AbstractTask {
}
- private void runContainer(Parser parser, String containerId, String localInputDir, String localOutputDir) {
+ private void runContainer(Parser parser, String containerId, String localInputDir, String localOutputDir, Map<String, String> properties) {
DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder();
DockerClient dockerClient = DockerClientBuilder.getInstance(config).build();
CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parser.getImageName()).withCmd("/bin/sh", "-c", parser.getExecutionCommand()).withName(containerId)
.withBinds(Bind.parse(localInputDir + ":" + parser.getInputDirPath()),
- Bind.parse(localOutputDir + ":" + parser.getOutputDirPath())).withTty(true).withAttachStdin(true).withAttachStdout(true).exec();
+ Bind.parse(localOutputDir + ":" + parser.getOutputDirPath()))
+ .withTty(true)
+ .withAttachStdin(true)
+ .withAttachStdout(true).withEnv(properties.entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue())
+ .collect(Collectors.toList()))
+ .exec();
logger.info("Created the container with id " + containerResponse.getId());
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 60403d2..309ed13 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -31,10 +31,7 @@ import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskInputs;
import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskOutput;
import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskOutputs;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.parser.Parser;
-import org.apache.airavata.model.appcatalog.parser.ParserConnector;
-import org.apache.airavata.model.appcatalog.parser.ParsingTemplate;
-import org.apache.airavata.model.appcatalog.parser.ParsingTemplateInput;
+import org.apache.airavata.model.appcatalog.parser.*;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.process.ProcessModel;
@@ -194,20 +191,38 @@ public class ParserWorkflowManager extends WorkflowManager {
ParsingTaskInputs inputs = new ParsingTaskInputs();
for (ParsingTemplateInput templateInput : templateInputs) {
- String expression = templateInput.getApplicationOutputName();
- try {
- ExperimentModel experiment = registryClient.getExperiment(completionMessage.getExperimentId());
- Optional<OutputDataObjectType> outputDataObj = experiment.getExperimentOutputs().stream().filter(outputDataObjectType -> outputDataObjectType.getName().equals(expression)).findFirst();
- if (outputDataObj.isPresent()) {
- ParsingTaskInput input = new ParsingTaskInput();
- input.setId(templateInput.getInputId());
- input.setValue(outputDataObj.get().getValue());
- inputs.addInput(input);
+
+ Optional<ParserInput> parserInputOp = parserInfo.getInputFiles().stream()
+ .filter(inp -> inp.getId().equals(templateInput.getTargetInputId())).findFirst();
+
+ ParsingTaskInput input = new ParsingTaskInput();
+ input.setId(templateInput.getTargetInputId());
+ if (parserInputOp.isPresent()) {
+ input.setType(parserInputOp.get().getType().name());
+ } else {
+ throw new Exception("Failed to find an input with id " + templateInput.getTargetInputId());
+ }
+
+ if(templateInput.getApplicationOutputName() != null) {
+ String applicationOutputName = templateInput.getApplicationOutputName();
+ try {
+ ExperimentModel experiment = registryClient.getExperiment(completionMessage.getExperimentId());
+ Optional<OutputDataObjectType> expOutputData = experiment.getExperimentOutputs().stream()
+ .filter(outputDataObjectType -> outputDataObjectType.getName().equals(applicationOutputName)).findFirst();
+
+ if (expOutputData.isPresent()) {
+ input.setValue(expOutputData.get().getValue());
+ } else {
+ throw new Exception("Could not find an experiment output with name " + applicationOutputName);
+ }
+ } catch (TException e) {
+ logger.error("Failed while fetching experiment " + completionMessage.getExperimentId());
+ throw new Exception("Failed while fetching experiment " + completionMessage.getExperimentId());
}
- } catch (TException e) {
- logger.error("Failed while fetching experiment " + completionMessage.getExperimentId());
- throw new Exception("Failed while fetching experiment " + completionMessage.getExperimentId());
+ } else {
+ input.setValue(processExpression(templateInput.getValue()));
}
+ inputs.addInput(input);
}
parsingTask.setParsingTaskInputs(inputs);
@@ -226,6 +241,10 @@ public class ParserWorkflowManager extends WorkflowManager {
return parsingTask;
}
+ private String processExpression(String expression) {
+ return expression;
+ }
+
private void createParserDagRecursively(List<AbstractTask> allTasks, Parser parentParserInfo, DataParsingTask parentTask, Map<String, Set<ParserConnector>> parentToChild,
ProcessCompletionMessage completionMessage, RegistryService.Client registryClient) throws Exception {
if (parentToChild.containsKey(parentParserInfo.getId())) {
@@ -245,12 +264,23 @@ public class ParserWorkflowManager extends WorkflowManager {
}
ParsingTaskInputs inputs = new ParsingTaskInputs();
- connector.getConnectorInputs().forEach(mapping -> {
- ParsingTaskInput input = new ParsingTaskInput();
- input.setContextVariableName(connector.getParentParserId() + "-" + mapping.getParentOutputId());
- input.setId(mapping.getInputId());
- inputs.addInput(input);
- });
+ for(ParserConnectorInput connectorInput : connector.getConnectorInputs()) {
+
+ Optional<ParserInput> parserInputOp = childParserInfo.getInputFiles().stream()
+ .filter(inp -> inp.getId().equals(connectorInput.getInputId())).findFirst();
+
+ if (parserInputOp.isPresent()) {
+ ParsingTaskInput input = new ParsingTaskInput();
+ // Either context variable or value is set
+ input.setContextVariableName(connector.getParentParserId() + "-" + connectorInput.getParentOutputId());
+ input.setValue(processExpression(connectorInput.getValue()));
+ input.setId(connectorInput.getInputId());
+ input.setType(parserInputOp.get().getType().name());
+ inputs.addInput(input);
+ } else {
+ throw new Exception("Failed to find an input with id " + connectorInput.getId());
+ }
+ }
ParsingTaskOutputs outputs = new ParsingTaskOutputs();
childParserInfo.getOutputFiles().forEach(parserOutput -> {