You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/10/29 19:34:46 UTC

[airavata] branch develop updated (3cba34f -> e1d65eb)

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git.


    from 3cba34f  Supporting serializable task params in task framework
     new 3c6b3dd  Serializable parsing task input output
     new e1d65eb  Improving parser framework and fixing bugs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../helix/impl/task/parsing/DataParsingTask.java   | 138 ++++++++++++++-------
 .../task/parsing/models/ParsingTaskInputs.java     |  16 ++-
 .../task/parsing/models/ParsingTaskOutputs.java    |  16 ++-
 .../helix/impl/workflow/ParserWorkflowManager.java |  43 +++++--
 .../helix/impl/workflow/WorkflowManager.java       |   6 +-
 .../api/service/handler/RegistryServerHandler.java |   8 +-
 6 files changed, 170 insertions(+), 57 deletions(-)


[airavata] 02/02: Improving parser framework and fixing bugs

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit e1d65eb53930e4250fb9d754fde65d5596440f88
Author: Dimuthu Wannipurage <di...@datasprouts.com>
AuthorDate: Mon Oct 29 15:34:36 2018 -0400

    Improving parser framework and fixing bugs
---
 .../helix/impl/task/parsing/DataParsingTask.java   | 138 ++++++++++++++-------
 .../helix/impl/workflow/ParserWorkflowManager.java |  43 +++++--
 .../helix/impl/workflow/WorkflowManager.java       |   6 +-
 .../api/service/handler/RegistryServerHandler.java |   8 +-
 4 files changed, 140 insertions(+), 55 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
index 9836bda..37a1b56 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/DataParsingTask.java
@@ -42,14 +42,13 @@ import org.apache.airavata.helix.task.api.annotation.TaskDef;
 import org.apache.airavata.helix.task.api.annotation.TaskParam;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
 import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
 import org.apache.airavata.model.appcatalog.parser.ParserInfo;
 import org.apache.airavata.model.appcatalog.parser.ParserInput;
 import org.apache.airavata.model.appcatalog.parser.ParserOutput;
-import org.apache.airavata.model.credential.store.CredentialSummary;
-import org.apache.airavata.model.credential.store.SummaryType;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.data.movement.DataMovementProtocol;
-import org.apache.airavata.model.data.replica.DataProductModel;
-import org.apache.airavata.model.data.replica.DataReplicaLocationModel;
+import org.apache.airavata.model.data.replica.*;
 import org.apache.airavata.registry.api.RegistryService;
 import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
 import org.apache.airavata.registry.api.exception.RegistryServiceException;
@@ -60,12 +59,9 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.Iterator;
+import java.net.URI;
 import java.util.List;
 import java.util.Optional;
 
@@ -91,6 +87,12 @@ public class DataParsingTask extends AbstractTask {
     @TaskParam(name = "Gateway ID")
     private String gatewayId;
 
+    @TaskParam(name = "Group Resource Profile Id")
+    private String groupResourceProfileId;
+
+    @TaskParam(name = "Local data dir")
+    private String localDataDir;
+
     @Override
     public TaskResult onRun(TaskHelper helper) {
         logger.info("Starting data parsing task " + getTaskId());
@@ -112,7 +114,14 @@ public class DataParsingTask extends AbstractTask {
                 if (filteredInputOptional.isPresent()) {
 
                     ParsingTaskInput parsingTaskInput = filteredInputOptional.get();
-                    String inputDataProductUri = getContextVariable(parsingTaskInput.getContextVariableName());
+                    String inputDataProductUri = parsingTaskInput.getValue() != null ? parsingTaskInput.getValue() : getContextVariable(parsingTaskInput.getContextVariableName());
+
+                    if (inputDataProductUri == null || inputDataProductUri.isEmpty()) {
+                        logger.error("Data product uri could not be null or empty for input " + parsingTaskInput.getId()
+                                + " with name " + parserInput.getName());
+                        throw new TaskOnFailException("Data product uri could not be null or empty for input "
+                                + parsingTaskInput.getId() + " with name " + parserInput.getName(), true, null);
+                    }
                     DataProductModel inputDataProduct = getRegistryServiceClient().getDataProduct(inputDataProductUri);
                     List<DataReplicaLocationModel> replicaLocations = inputDataProduct.getReplicaLocations();
 
@@ -120,7 +129,7 @@ public class DataParsingTask extends AbstractTask {
 
                     for (DataReplicaLocationModel replicaLocationModel : replicaLocations) {
                         String storageResourceId = replicaLocationModel.getStorageResourceId();
-                        String remoteFilePath = replicaLocationModel.getFilePath();
+                        String remoteFilePath = new URI(replicaLocationModel.getFilePath()).getPath();
                         String localFilePath = localInputDir + (localInputDir.endsWith(File.separator)? "" : File.separator)
                                 + parserInput.getName();
 
@@ -160,7 +169,7 @@ public class DataParsingTask extends AbstractTask {
                     String remoteFilePath = "parsers" + File.separator + getTaskId() + File.separator + "outputs" + File.separator + parserOutput.getName();
 
                     if (new File(localFilePath).exists()) {
-                        uploadFileToStorageResource(parsingTaskOutput.getStorageResourceId(), remoteFilePath, localFilePath, helper.getAdaptorSupport());
+                        uploadFileToStorageResource(parsingTaskOutput, remoteFilePath, localFilePath, helper.getAdaptorSupport());
                     } else if (parserOutput.isRequiredFile()) {
                         logger.error("Expected output file " + localFilePath + " can not be found");
                         throw new TaskOnFailException("Expected output file " + localFilePath + " can not be found", false, null);
@@ -200,47 +209,50 @@ public class DataParsingTask extends AbstractTask {
     }
 
     private void runContainer(ParserInfo parserInfo, String containerId, String localInputDir, String localOutputDir) {
-        DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder()
-                .withDockerHost("tcp://localhost:2376");
+        DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder();
 
         DockerClient dockerClient = DockerClientBuilder.getInstance(config).build();
-        CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parserInfo.getImageName()).withCmd(parserInfo.getExecutionCommand()).withName(containerId)
+
+        CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parserInfo.getImageName()).withCmd("/bin/sh", "-c", parserInfo.getExecutionCommand()).withName(containerId)
                 .withBinds(Bind.parse(localInputDir + ":" + parserInfo.getInputDirPath()),
-                        Bind.parse(localOutputDir + ":" + parserInfo.getOutputDirPath())).withTty(true).withAttachStdin(true)
-                .exec();
+                        Bind.parse(localOutputDir + ":" + parserInfo.getOutputDirPath())).withTty(true).exec();
+
+
+        logger.info("Created the container with id " + containerResponse.getId());
+
         if (containerResponse.getWarnings() != null) {
             StringBuilder warningStr = new StringBuilder();
             for (String w : containerResponse.getWarnings()) {
                 warningStr.append(w).append(",");
             }
-            logger.warn("Container warnings : " + warningStr);
+            logger.warn("Container " + containerResponse.getId() + " warnings : " + warningStr);
+        } else {
+            logger.info("Starting container with id " + containerResponse.getId());
+            dockerClient.startContainerCmd(containerResponse.getId()).exec();
+
         }
+
+        dockerClient.removeContainerCmd(containerResponse.getId()).exec();
+        logger.info("Successfully removed container with id " + containerResponse.getId());
     }
 
     private StorageResourceAdaptor getStorageResourceAdaptor(String storageResourceId, AdaptorSupport adaptorSupport) throws TaskOnFailException, TException, AgentException {
-        List<CredentialSummary> allCredentialSummaryForGateway = getCredentialServiceClient()
-                .getAllCredentialSummaryForGateway(SummaryType.SSH, gatewayId);
-
-        if (allCredentialSummaryForGateway == null || allCredentialSummaryForGateway.isEmpty()) {
-            logger.error("Could not find SSH summary for gateway " + gatewayId);
-            throw new TaskOnFailException("Could not find SSH summary for gateway " + gatewayId, false, null);
-        }
 
-        StoragePreference gatewayStoragePreference = getRegistryServiceClient()
-                .getGatewayStoragePreference(gatewayId, storageResourceId);
+        GroupResourceProfile groupResourceProfile = getRegistryServiceClient().getGroupResourceProfile(groupResourceProfileId);
+        StoragePreference gatewayStoragePreference = getRegistryServiceClient().getGatewayStoragePreference(gatewayId, storageResourceId);
 
+        String token = gatewayStoragePreference.getResourceSpecificCredentialStoreToken();
+        if (token == null || token.isEmpty()) {
+            token = groupResourceProfile.getDefaultCredentialStoreToken();
+        }
         if (gatewayStoragePreference == null) {
             logger.error("Could not find a gateway storage preference for stogate " + storageResourceId + " gateway id " + gatewayId);
             throw new TaskOnFailException("Could not find a gateway storage preference for stogate " + storageResourceId + " gateway id " + gatewayId, false, null);
         }
 
-        StorageResourceAdaptor storageResourceAdaptor = adaptorSupport.fetchStorageAdaptor(gatewayId,
-                storageResourceId, DataMovementProtocol.SCP,
-                Optional.ofNullable(gatewayStoragePreference.getResourceSpecificCredentialStoreToken())
-                        .orElse(allCredentialSummaryForGateway.get(0).getToken()),
+        return adaptorSupport.fetchStorageAdaptor(gatewayId, storageResourceId, DataMovementProtocol.SCP, token,
                 gatewayStoragePreference.getLoginUserName());
 
-        return storageResourceAdaptor;
     }
 
     private boolean downloadFileFromStorageResource(String storageResourceId, String remoteFilePath, String localFilePath, AdaptorSupport adaptorSupport) {
@@ -257,26 +269,53 @@ public class DataParsingTask extends AbstractTask {
         }
     }
 
-    private boolean uploadFileToStorageResource(String storageResourceId, String remoteFilePath, String localFilePath, AdaptorSupport adaptorSupport) {
-        logger.info("Uploading from local path " + localFilePath + " to remote path " + remoteFilePath + " of storage resource " + storageResourceId);
+    private void uploadFileToStorageResource(ParsingTaskOutput parsingTaskOutput, String remoteFilePath, String localFilePath, AdaptorSupport adaptorSupport) throws TaskOnFailException {
+        logger.info("Uploading from local path " + localFilePath + " to remote path " + remoteFilePath + " of storage resource " + parsingTaskOutput.getStorageResourceId());
         try {
-            StoragePreference gatewayStoragePreference = getRegistryServiceClient().getGatewayStoragePreference(gatewayId, storageResourceId);
+            StoragePreference gatewayStoragePreference = getRegistryServiceClient().getGatewayStoragePreference(gatewayId, parsingTaskOutput.getStorageResourceId());
+            StorageResourceDescription storageResource = getRegistryServiceClient().getStorageResource(parsingTaskOutput.getStorageResourceId());
+
             String remoteFileRoot = gatewayStoragePreference.getFileSystemRootLocation();
             remoteFilePath = remoteFileRoot + (remoteFileRoot.endsWith(File.separator) ? "" : File.separator) + remoteFilePath;
-            StorageResourceAdaptor storageResourceAdaptor = getStorageResourceAdaptor(storageResourceId, adaptorSupport);
+            StorageResourceAdaptor storageResourceAdaptor = getStorageResourceAdaptor(parsingTaskOutput.getStorageResourceId(), adaptorSupport);
             storageResourceAdaptor.createDirectory(new File(remoteFilePath).getParent(), true);
             storageResourceAdaptor.uploadFile(localFilePath, remoteFilePath);
-            return true;
+
+            logger.info("Uploading completed. Registering data product for path " + remoteFilePath);
+
+            DataProductModel dataProductModel = new DataProductModel();
+            dataProductModel.setGatewayId(getGatewayId());
+            dataProductModel.setOwnerName("ParserTask");
+            dataProductModel.setProductName(parsingTaskOutput.getId());
+            dataProductModel.setDataProductType(DataProductType.FILE);
+
+            DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+            replicaLocationModel.setStorageResourceId(parsingTaskOutput.getStorageResourceId());
+            replicaLocationModel.setReplicaName("Parsing task output " + parsingTaskOutput.getId());
+            replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+            replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+
+            URI destinationURI = new URI("file", gatewayStoragePreference.getLoginUserName(),
+                    storageResource.getHostName(), 22, remoteFilePath, null, null);
+
+            replicaLocationModel.setFilePath(destinationURI.toString());
+            dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+            String productUri = getRegistryServiceClient().registerDataProduct(dataProductModel);
+
+            logger.info("Data product is " + productUri + " for path " + remoteFilePath);
+
+            setContextVariable(parsingTaskOutput.getContextVariableName(), productUri);
         } catch (Exception e) {
             logger.error("Failed to upload from local path " + localFilePath + " to remote path " + remoteFilePath +
-                    " of storage resource " + storageResourceId, e);
-            return false;
+                    " of storage resource " + parsingTaskOutput.getStorageResourceId(), e);
+            throw new TaskOnFailException("Failed to upload from local path " + localFilePath + " to remote path " + remoteFilePath +
+                    " of storage resource " + parsingTaskOutput.getStorageResourceId(), false, e);
         }
     }
 
     private String createLocalInputDir(String containerName) throws TaskOnFailException {
-        String localInpDir = ServerSettings.getLocalDataLocation();
-        localInpDir = (localInpDir.endsWith(File.separator) ? localInpDir : localInpDir + File.separator) +
+        String localInpDir = (localDataDir.endsWith(File.separator) ? localDataDir : localDataDir + File.separator) +
                 "parsers" + File.separator + containerName + File.separator + "data" + File.separator + "input" + File.separator;
         try {
             FileUtils.forceMkdir(new File(localInpDir));
@@ -288,8 +327,7 @@ public class DataParsingTask extends AbstractTask {
     }
 
     private String createLocalOutputDir(String containerName) throws TaskOnFailException {
-        String localOutDir = ServerSettings.getLocalDataLocation();
-        localOutDir = (localOutDir.endsWith(File.separator) ? localOutDir : localOutDir + File.separator) +
+        String localOutDir = (localDataDir.endsWith(File.separator) ? localDataDir : localDataDir + File.separator) +
                 "parsers" + File.separator + containerName + File.separator + "data" + File.separator + "output" + File.separator;
         try {
             FileUtils.forceMkdir(new File(localOutDir));
@@ -352,4 +390,20 @@ public class DataParsingTask extends AbstractTask {
     public void setGatewayId(String gatewayId) {
         this.gatewayId = gatewayId;
     }
+
+    public String getGroupResourceProfileId() {
+        return groupResourceProfileId;
+    }
+
+    public void setGroupResourceProfileId(String groupResourceProfileId) {
+        this.groupResourceProfileId = groupResourceProfileId;
+    }
+
+    public String getLocalDataDir() {
+        return localDataDir;
+    }
+
+    public void setLocalDataDir(String localDataDir) {
+        this.localDataDir = localDataDir;
+    }
 }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 18ea22c..86517bf 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -35,7 +35,7 @@ import org.apache.airavata.model.appcatalog.parser.DagElement;
 import org.apache.airavata.model.appcatalog.parser.ParserInfo;
 import org.apache.airavata.model.appcatalog.parser.ParsingTemplate;
 import org.apache.airavata.model.appcatalog.parser.ParsingTemplateInput;
-import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.registry.api.RegistryService;
@@ -57,7 +57,7 @@ public class ParserWorkflowManager extends WorkflowManager {
 
     private final static Logger logger = LoggerFactory.getLogger(ParserWorkflowManager.class);
 
-    private String parserStorageResourceId = "";
+    private String parserStorageResourceId = "pgadev.scigap.org_7ddf28fd-d503-4ff8-bbc5-3279a7c3b99e";
 
     public ParserWorkflowManager() throws ApplicationSettingsException {
         super(ServerSettings.getSetting("parser.workflow.manager.name"));
@@ -66,7 +66,7 @@ public class ParserWorkflowManager extends WorkflowManager {
     public static void main(String[] args) throws Exception {
         ParserWorkflowManager manager = new ParserWorkflowManager();
         manager.init();
-        manager.runConsumer();
+        manager.test();
     }
 
     private void init() throws Exception {
@@ -177,11 +177,19 @@ public class ParserWorkflowManager extends WorkflowManager {
         }
     }
 
-    private DataParsingTask createParentTask(ParserInfo parserInfo, ProcessCompletionMessage completionMessage, List<ParsingTemplateInput> templateInputs, RegistryService.Client registryClient) {
+    private DataParsingTask createParentTask(ParserInfo parserInfo, ProcessCompletionMessage completionMessage,
+                                             List<ParsingTemplateInput> templateInputs, RegistryService.Client registryClient) throws Exception {
         DataParsingTask parsingTask = new DataParsingTask();
-        parsingTask.setTaskId(completionMessage.getExperimentId() + "-" + parserInfo.getId() + "-" + UUID.randomUUID().toString());
+        parsingTask.setTaskId(normalizeTaskId(completionMessage.getExperimentId() + "-" + parserInfo.getId() + "-" + UUID.randomUUID().toString()));
         parsingTask.setGatewayId(completionMessage.getGatewayId());
         parsingTask.setParserInfoId(parserInfo.getId());
+        parsingTask.setLocalDataDir("/tmp");
+        try {
+            parsingTask.setGroupResourceProfileId(registryClient.getProcess(completionMessage.getProcessId()).getGroupResourceProfileId());
+        } catch (TException e) {
+            logger.error("Failed while fetching process model for process id  " + completionMessage.getProcessId());
+            throw new Exception("Failed while fetching process model for process id  " + completionMessage.getProcessId());
+        }
 
         ParsingTaskInputs inputs = new ParsingTaskInputs();
 
@@ -189,15 +197,16 @@ public class ParserWorkflowManager extends WorkflowManager {
             String expression = templateInput.getExpression();
             try {
                 ExperimentModel experiment = registryClient.getExperiment(completionMessage.getExperimentId());
-                Optional<InputDataObjectType> inputDataObj = experiment.getExperimentInputs().stream().filter(inputDataObjectType -> inputDataObjectType.getName().equals(expression)).findFirst();
-                if (inputDataObj.isPresent()) {
+                Optional<OutputDataObjectType> outputDataObj = experiment.getExperimentOutputs().stream().filter(outputDataObjectType -> outputDataObjectType.getName().equals(expression)).findFirst();
+                if (outputDataObj.isPresent()) {
                     ParsingTaskInput input = new ParsingTaskInput();
                     input.setId(templateInput.getInputId());
-                    input.setValue(inputDataObj.get().getValue());
+                    input.setValue(outputDataObj.get().getValue());
                     inputs.addInput(input);
                 }
             } catch (TException e) {
                 logger.error("Failed while fetching experiment " + completionMessage.getExperimentId());
+                throw new Exception("Failed while fetching experiment " + completionMessage.getExperimentId());
             }
         }
 
@@ -224,9 +233,16 @@ public class ParserWorkflowManager extends WorkflowManager {
             for (DagElement dagElement : parentToChild.get(parentParserInfo.getId())) {
                 ParserInfo childParserInfo = registryClient.getParserInfo(dagElement.getChildParserId());
                 DataParsingTask parsingTask = new DataParsingTask();
-                parsingTask.setTaskId(completionMessage.getExperimentId() + "-" + childParserInfo.getId() + "-" + UUID.randomUUID().toString());
+                parsingTask.setTaskId(normalizeTaskId(completionMessage.getExperimentId() + "-" + childParserInfo.getId() + "-" + UUID.randomUUID().toString()));
                 parsingTask.setGatewayId(completionMessage.getGatewayId());
                 parsingTask.setParserInfoId(childParserInfo.getId());
+                parsingTask.setLocalDataDir("/tmp");
+                try {
+                    parsingTask.setGroupResourceProfileId(registryClient.getProcess(completionMessage.getProcessId()).getGroupResourceProfileId());
+                } catch (TException e) {
+                    logger.error("Failed while fetching process model for process id  " + completionMessage.getProcessId());
+                    throw new Exception("Failed while fetching process model for process id  " + completionMessage.getProcessId());
+                }
 
                 ParsingTaskInputs inputs = new ParsingTaskInputs();
                 dagElement.getInputOutputMapping().forEach(mapping -> {
@@ -285,4 +301,13 @@ public class ParserWorkflowManager extends WorkflowManager {
             consumer.commitAsync();
         }
     }
+
+    private void test() {
+        ProcessCompletionMessage completionMessage = new ProcessCompletionMessage();
+        completionMessage.setExperimentId("Echo_on_Oct_24,_2018_10:03_AM_b67b6a7e-d41f-46c2-a756-d8cf91155e1e");
+        completionMessage.setGatewayId("seagrid");
+        completionMessage.setProcessId("PROCESS_be7d1946-e404-447e-a7e8-5581aaef2ef8");
+        process(completionMessage);
+    }
+
 }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
index 7413ad5..f610721 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
@@ -43,7 +43,7 @@ public class WorkflowManager {
     protected void initComponents() throws Exception {
         initRegistryClientPool();
         initWorkflowOperatorr();
-        initStatusPublisher();
+        //initStatusPublisher();
         initCuratorClient();
     }
 
@@ -121,4 +121,8 @@ public class WorkflowManager {
         msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
         getStatusPublisher().publish(msgCtx);
     }
+
+    public String normalizeTaskId(String taskId) {
+        return taskId.replace(":", "-").replace(",", "-");
+    }
 }
diff --git a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
index 6372389..e305cc7 100644
--- a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
+++ b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
@@ -4949,7 +4949,7 @@ public class RegistryServerHandler implements RegistryService.Iface {
         parserInfo.setId(parserId);
         switch (parserId) {
             case "001":
-                parserInfo.setExecutionCommand("/opt/execute.txt");
+                parserInfo.setExecutionCommand("/opt/execute.sh");
                 parserInfo.setImageName("dimuthuupe/uppercase:v1");
                 parserInfo.setInputDirPath("/opt/inputs");
                 parserInfo.setOutputDirPath("/opt/outputs");
@@ -4968,7 +4968,7 @@ public class RegistryServerHandler implements RegistryService.Iface {
                 parserInfo.setOutputFiles(Collections.singletonList(output1));
                 break;
             case "002":
-                parserInfo.setExecutionCommand("/opt/execute.txt");
+                parserInfo.setExecutionCommand("/opt/execute.sh");
                 parserInfo.setImageName("dimuthuupe/lowercase:v1");
                 parserInfo.setInputDirPath("/opt/inputs");
                 parserInfo.setOutputDirPath("/opt/outputs");
@@ -4998,8 +4998,10 @@ public class RegistryServerHandler implements RegistryService.Iface {
     @Override
     public ParsingTemplate getParsingTemplate(String templateId) throws RegistryServiceException, TException {
         ParsingTemplate template = new ParsingTemplate();
+        template.setId("001");
+        template.setApplicationInterface("sample");
         ParsingTemplateInput templateInput1 =  new ParsingTemplateInput();
-        templateInput1.setExpression("Echo-Out");
+        templateInput1.setExpression("Standard Out");
         templateInput1.setInputId("001");
 
         template.addToInitialInputs(templateInput1);


[airavata] 01/02: Serializable parsing task input output

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 3c6b3dd4d359cde18036e8df3621a54be214385f
Author: Dimuthu Wannipurage <di...@datasprouts.com>
AuthorDate: Mon Oct 29 15:33:22 2018 -0400

    Serializable parsing task input output
---
 .../impl/task/parsing/models/ParsingTaskInputs.java      | 16 +++++++++++++++-
 .../impl/task/parsing/models/ParsingTaskOutputs.java     | 16 +++++++++++++++-
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/models/ParsingTaskInputs.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/models/ParsingTaskInputs.java
index 812f3b7..f710be5 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/models/ParsingTaskInputs.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/models/ParsingTaskInputs.java
@@ -1,9 +1,12 @@
 package org.apache.airavata.helix.impl.task.parsing.models;
 
+import com.google.gson.Gson;
+import org.apache.airavata.helix.task.api.TaskParamType;
+
 import java.util.ArrayList;
 import java.util.List;
 
-public class ParsingTaskInputs {
+public class ParsingTaskInputs implements TaskParamType {
 
     private List<ParsingTaskInput> inputs = new ArrayList<>();
 
@@ -18,4 +21,15 @@ public class ParsingTaskInputs {
     public void addInput(ParsingTaskInput input) {
         this.inputs.add(input);
     }
+
+    @Override
+    public String serialize() {
+        return new Gson().toJson(this);
+    }
+
+    @Override
+    public void deserialize(String content) {
+        ParsingTaskInputs parsingTaskInputs = new Gson().fromJson(content, ParsingTaskInputs.class);
+        this.inputs = parsingTaskInputs.getInputs();
+    }
 }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/models/ParsingTaskOutputs.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/models/ParsingTaskOutputs.java
index cdda0b3..d0c0820 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/models/ParsingTaskOutputs.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/parsing/models/ParsingTaskOutputs.java
@@ -1,9 +1,12 @@
 package org.apache.airavata.helix.impl.task.parsing.models;
 
+import com.google.gson.Gson;
+import org.apache.airavata.helix.task.api.TaskParamType;
+
 import java.util.ArrayList;
 import java.util.List;
 
-public class ParsingTaskOutputs {
+public class ParsingTaskOutputs implements TaskParamType {
     private List<ParsingTaskOutput> outputs = new ArrayList<>();
 
     public List<ParsingTaskOutput> getOutputs() {
@@ -17,4 +20,15 @@ public class ParsingTaskOutputs {
     public void addOutput(ParsingTaskOutput output) {
         outputs.add(output);
     }
+
+    @Override
+    public String serialize() {
+        return new Gson().toJson(this);
+    }
+
+    @Override
+    public void deserialize(String content) {
+        ParsingTaskOutputs parsingTaskOutputs = new Gson().fromJson(content, ParsingTaskOutputs.class);
+        this.outputs = parsingTaskOutputs.getOutputs();
+    }
 }