You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/11/06 23:12:12 UTC

[airavata] branch develop updated: Supporting properties for parsers

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new eccf3bd  Supporting properties for parsers
eccf3bd is described below

commit eccf3bdc65f0b9407cd3eda2f8b376b42a9e9aff
Author: Dimuthu Wannipurage <di...@datasprouts.com>
AuthorDate: Tue Nov 6 18:12:03 2018 -0500

    Supporting properties for parsers
---
 .../helix/impl/task/parsing/DataParsingTask.java   | 75 ++++++++++++++--------
 .../helix/impl/workflow/ParserWorkflowManager.java | 74 ++++++++++++++-------
 2 files changed, 99 insertions(+), 50 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
index 03f9a56..90d499a 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
@@ -65,8 +65,11 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * Implementation of the data parsing task.
@@ -111,54 +114,63 @@ public class DataParsingTask extends AbstractTask {
 
             logger.info("Downloading input files to local input directory");
 
+            Map<String, String> properties = new HashMap<>();
             for (ParserInput parserInput : parser.getInputFiles()) {
                 Optional<ParsingTaskInput> filteredInputOptional = parsingTaskInputs.getInputs().stream().filter(inp -> parserInput.getId().equals(inp.getId())).findFirst();
 
                 if (filteredInputOptional.isPresent()) {
 
                     ParsingTaskInput parsingTaskInput = filteredInputOptional.get();
-                    String inputDataProductUri = parsingTaskInput.getValue() != null ? parsingTaskInput.getValue() : getContextVariable(parsingTaskInput.getContextVariableName());
 
-                    if (inputDataProductUri == null || inputDataProductUri.isEmpty()) {
-                        logger.error("Data product uri could not be null or empty for input " + parsingTaskInput.getId()
-                                + " with name " + parserInput.getName());
-                        throw new TaskOnFailException("Data product uri could not be null or empty for input "
-                                + parsingTaskInput.getId() + " with name " + parserInput.getName(), true, null);
-                    }
-                    DataProductModel inputDataProduct = getRegistryServiceClient().getDataProduct(inputDataProductUri);
-                    List<DataReplicaLocationModel> replicaLocations = inputDataProduct.getReplicaLocations();
+                    String inputVal = parsingTaskInput.getValue() != null ? parsingTaskInput.getValue() : getContextVariable(parsingTaskInput.getContextVariableName());
+
+                    if ("PROPERTY".equals(parsingTaskInput.getType())) {
+                        properties.put(parsingTaskInput.getContextVariableName(), inputVal);
+                    } else if ("FILE".equals(parsingTaskInput.getType())) {
 
-                    boolean downloadPassed = false;
+                        String inputDataProductUri = inputVal;
+
+                        if (inputDataProductUri == null || inputDataProductUri.isEmpty()) {
+                            logger.error("Data product uri could not be null or empty for input " + parsingTaskInput.getId()
+                                    + " with name " + parserInput.getName());
+                            throw new TaskOnFailException("Data product uri could not be null or empty for input "
+                                    + parsingTaskInput.getId() + " with name " + parserInput.getName(), true, null);
+                        }
+                        DataProductModel inputDataProduct = getRegistryServiceClient().getDataProduct(inputDataProductUri);
+                        List<DataReplicaLocationModel> replicaLocations = inputDataProduct.getReplicaLocations();
 
-                    for (DataReplicaLocationModel replicaLocationModel : replicaLocations) {
-                        String storageResourceId = replicaLocationModel.getStorageResourceId();
-                        String remoteFilePath = new URI(replicaLocationModel.getFilePath()).getPath();
-                        String localFilePath = localInputDir + (localInputDir.endsWith(File.separator)? "" : File.separator)
-                                + parserInput.getName();
+                        boolean downloadPassed = false;
 
-                        downloadPassed = downloadFileFromStorageResource(storageResourceId, remoteFilePath, localFilePath, helper.getAdaptorSupport());
+                        for (DataReplicaLocationModel replicaLocationModel : replicaLocations) {
+                            String storageResourceId = replicaLocationModel.getStorageResourceId();
+                            String remoteFilePath = new URI(replicaLocationModel.getFilePath()).getPath();
+                            String localFilePath = localInputDir + (localInputDir.endsWith(File.separator) ? "" : File.separator)
+                                    + parserInput.getName();
 
-                        if (downloadPassed) {
-                            break;
+                            downloadPassed = downloadFileFromStorageResource(storageResourceId, remoteFilePath, localFilePath, helper.getAdaptorSupport());
+
+                            if (downloadPassed) {
+                                break;
+                            }
                         }
-                    }
 
-                    if (!downloadPassed) {
-                        logger.error("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri);
-                        throw new TaskOnFailException("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri, true, null);
+                        if (!downloadPassed) {
+                            logger.error("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri);
+                            throw new TaskOnFailException("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri, true, null);
+                        }
                     }
                 } else {
                     if (parserInput.isRequiredInput()) {
-                        logger.error("File download info with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available");
-                        throw new TaskOnFailException("File download info with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available", true, null);
+                        logger.error("Parser input with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available");
+                        throw new TaskOnFailException("Parser input with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available", true, null);
                     } else {
-                        logger.warn("File download info with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available. But it is not required");
+                        logger.warn("Parser input with id with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available. But it is not required");
                     }
                 }
             }
 
             logger.info("Running container with id " + containerId + " local input dir " + localInputDir + " local output dir " + localOutDir);
-            runContainer(parser, containerId, localInputDir, localOutDir);
+            runContainer(parser, containerId, localInputDir, localOutDir, properties);
 
             for (ParserOutput parserOutput : parser.getOutputFiles()) {
 
@@ -211,14 +223,21 @@ public class DataParsingTask extends AbstractTask {
 
     }
 
-    private void runContainer(Parser parser, String containerId, String localInputDir, String localOutputDir) {
+    private void runContainer(Parser parser, String containerId, String localInputDir, String localOutputDir, Map<String, String> properties) {
         DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder();
 
         DockerClient dockerClient = DockerClientBuilder.getInstance(config).build();
 
         CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parser.getImageName()).withCmd("/bin/sh", "-c", parser.getExecutionCommand()).withName(containerId)
                 .withBinds(Bind.parse(localInputDir + ":" + parser.getInputDirPath()),
-                        Bind.parse(localOutputDir + ":" + parser.getOutputDirPath())).withTty(true).withAttachStdin(true).withAttachStdout(true).exec();
+                        Bind.parse(localOutputDir + ":" + parser.getOutputDirPath()))
+                .withTty(true)
+                .withAttachStdin(true)
+                .withAttachStdout(true).withEnv(properties.entrySet()
+                        .stream()
+                        .map(entry -> entry.getKey() + "=" + entry.getValue())
+                        .collect(Collectors.toList()))
+                .exec();
 
         logger.info("Created the container with id " + containerResponse.getId());
 
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 60403d2..309ed13 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -31,10 +31,7 @@ import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskInputs;
 import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskOutput;
 import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskOutputs;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.parser.Parser;
-import org.apache.airavata.model.appcatalog.parser.ParserConnector;
-import org.apache.airavata.model.appcatalog.parser.ParsingTemplate;
-import org.apache.airavata.model.appcatalog.parser.ParsingTemplateInput;
+import org.apache.airavata.model.appcatalog.parser.*;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.process.ProcessModel;
@@ -194,20 +191,38 @@ public class ParserWorkflowManager extends WorkflowManager {
         ParsingTaskInputs inputs = new ParsingTaskInputs();
 
         for (ParsingTemplateInput templateInput : templateInputs) {
-            String expression = templateInput.getApplicationOutputName();
-            try {
-                ExperimentModel experiment = registryClient.getExperiment(completionMessage.getExperimentId());
-                Optional<OutputDataObjectType> outputDataObj = experiment.getExperimentOutputs().stream().filter(outputDataObjectType -> outputDataObjectType.getName().equals(expression)).findFirst();
-                if (outputDataObj.isPresent()) {
-                    ParsingTaskInput input = new ParsingTaskInput();
-                    input.setId(templateInput.getInputId());
-                    input.setValue(outputDataObj.get().getValue());
-                    inputs.addInput(input);
+
+            Optional<ParserInput> parserInputOp = parserInfo.getInputFiles().stream()
+                    .filter(inp -> inp.getId().equals(templateInput.getTargetInputId())).findFirst();
+
+            ParsingTaskInput input = new ParsingTaskInput();
+            input.setId(templateInput.getTargetInputId());
+            if (parserInputOp.isPresent()) {
+                input.setType(parserInputOp.get().getType().name());
+            } else {
+                throw new Exception("Failed to find an input with id " + templateInput.getTargetInputId());
+            }
+
+            if(templateInput.getApplicationOutputName() != null) {
+                String applicationOutputName = templateInput.getApplicationOutputName();
+                try {
+                    ExperimentModel experiment = registryClient.getExperiment(completionMessage.getExperimentId());
+                    Optional<OutputDataObjectType> expOutputData = experiment.getExperimentOutputs().stream()
+                            .filter(outputDataObjectType -> outputDataObjectType.getName().equals(applicationOutputName)).findFirst();
+
+                    if (expOutputData.isPresent()) {
+                        input.setValue(expOutputData.get().getValue());
+                    } else {
+                        throw new Exception("Could not find an experiment output with name " + applicationOutputName);
+                    }
+                } catch (TException e) {
+                    logger.error("Failed while fetching experiment " + completionMessage.getExperimentId());
+                    throw new Exception("Failed while fetching experiment " + completionMessage.getExperimentId());
                 }
-            } catch (TException e) {
-                logger.error("Failed while fetching experiment " + completionMessage.getExperimentId());
-                throw new Exception("Failed while fetching experiment " + completionMessage.getExperimentId());
+            } else {
+                input.setValue(processExpression(templateInput.getValue()));
             }
+            inputs.addInput(input);
         }
 
         parsingTask.setParsingTaskInputs(inputs);
@@ -226,6 +241,10 @@ public class ParserWorkflowManager extends WorkflowManager {
         return parsingTask;
     }
 
+    private String processExpression(String expression) {
+        return expression;
+    }
+
     private void createParserDagRecursively(List<AbstractTask> allTasks, Parser parentParserInfo, DataParsingTask parentTask, Map<String, Set<ParserConnector>> parentToChild,
                                             ProcessCompletionMessage completionMessage, RegistryService.Client registryClient) throws Exception {
         if (parentToChild.containsKey(parentParserInfo.getId())) {
@@ -245,12 +264,23 @@ public class ParserWorkflowManager extends WorkflowManager {
                 }
 
                 ParsingTaskInputs inputs = new ParsingTaskInputs();
-                connector.getConnectorInputs().forEach(mapping -> {
-                    ParsingTaskInput input = new ParsingTaskInput();
-                    input.setContextVariableName(connector.getParentParserId() + "-" + mapping.getParentOutputId());
-                    input.setId(mapping.getInputId());
-                    inputs.addInput(input);
-                });
+                for(ParserConnectorInput connectorInput : connector.getConnectorInputs()) {
+
+                    Optional<ParserInput> parserInputOp = childParserInfo.getInputFiles().stream()
+                            .filter(inp -> inp.getId().equals(connectorInput.getInputId())).findFirst();
+
+                    if (parserInputOp.isPresent()) {
+                        ParsingTaskInput input = new ParsingTaskInput();
+                        // Either context variable or value is set
+                        input.setContextVariableName(connector.getParentParserId() + "-" + connectorInput.getParentOutputId());
+                        input.setValue(processExpression(connectorInput.getValue()));
+                        input.setId(connectorInput.getInputId());
+                        input.setType(parserInputOp.get().getType().name());
+                        inputs.addInput(input);
+                    } else {
+                        throw new Exception("Failed to find an input with id " + connectorInput.getId());
+                    }
+                }
 
                 ParsingTaskOutputs outputs = new ParsingTaskOutputs();
                 childParserInfo.getOutputFiles().forEach(parserOutput -> {