You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/08/24 01:51:24 UTC

[airavata] branch staging updated: Optimizing task retry logic

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging by this push:
     new c905ef5  Optimizing task retry logic
c905ef5 is described below

commit c905ef59739bb9ad765457ca9baa04e6d09f882e
Author: dimuthu <di...@gmail.com>
AuthorDate: Thu Aug 23 21:51:16 2018 -0400

    Optimizing task retry logic
---
 .../airavata/helix/impl/task/AiravataTask.java     | 83 ++++++++++++++--------
 .../airavata/helix/impl/task/env/EnvSetupTask.java |  2 +-
 .../helix/impl/task/staging/DataStagingTask.java   |  8 +--
 .../impl/task/staging/InputDataStagingTask.java    |  6 +-
 .../impl/task/staging/OutputDataStagingTask.java   |  2 +-
 .../apache/airavata/helix/core/AbstractTask.java   | 10 +++
 .../airavata/helix/core/util/MonitoringUtil.java   | 23 ++++++
 7 files changed, 94 insertions(+), 40 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index fa423bc..d2e03d8 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -90,42 +90,63 @@ public abstract class AiravataTask extends AbstractTask {
     }
 
     protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
+        int currentRetryCount = 0;
+        try {
+            currentRetryCount = getCurrentRetryCount();
+        } catch (Exception e) {
+            logger.error("Failed to obtain current retry count. So failing the task permanently", e);
+            fatal = true;
+        }
 
-        ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
-        StringWriter errors = new StringWriter();
+        logger.warn("Task failed with fatal = " + fatal + ".  Current retry count " + currentRetryCount);
 
-        String errorCode = UUID.randomUUID().toString();
-        String errorMessage = "Error Code : " + errorCode + ", Task " + getTaskId() + " failed due to " + reason +
-                (error == null ? "" : ", " + error.getMessage());
+        if (currentRetryCount < getRetryCount() && !fatal) {
+            try {
+                markNewRetry();
+            } catch (Exception e) {
+                logger.error("Failed to mark retry. So failing the task permanently", e);
+                fatal = true;
+            }
+        }
 
-        // wrapping from new error object with error code
-        error = new TaskOnFailException(errorMessage, true, error);
+        if (currentRetryCount >= getRetryCount() || fatal) {
+            ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
+            StringWriter errors = new StringWriter();
 
-        status.setReason(errorMessage);
-        errors.write(ExceptionUtils.getStackTrace(error));
-        logger.error(errorMessage, error);
+            String errorCode = UUID.randomUUID().toString();
+            String errorMessage = "Error Code : " + errorCode + ", Task " + getTaskId() + " failed due to " + reason +
+                    (error == null ? "" : ", " + error.getMessage());
 
-        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        if (getTaskContext() != null) { // task context could be null if the initialization failed
-            getTaskContext().setProcessStatus(status);
-        } else {
-            logger.warn("Task context is null. So can not store the process status in the context");
-        }
+            // wrapping from new error object with error code
+            error = new TaskOnFailException(errorMessage, true, error);
 
-        ErrorModel errorModel = new ErrorModel();
-        errorModel.setUserFriendlyMessage(reason);
-        errorModel.setActualErrorMessage(errors.toString());
-        errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+            status.setReason(errorMessage);
+            errors.write(ExceptionUtils.getStackTrace(error));
+            logger.error(errorMessage, error);
 
-        if (!skipTaskStatusPublish) {
-            publishTaskState(TaskState.FAILED);
-            saveAndPublishProcessStatus(taskContext != null ? taskContext.getProcessStatus() : status);
-            saveExperimentError(errorModel);
-            saveProcessError(errorModel);
-            saveTaskError(errorModel);
-        }
+            status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            if (getTaskContext() != null) { // task context could be null if the initialization failed
+                getTaskContext().setProcessStatus(status);
+            } else {
+                logger.warn("Task context is null. So can not store the process status in the context");
+            }
+
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setUserFriendlyMessage(reason);
+            errorModel.setActualErrorMessage(errors.toString());
+            errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
 
-        return onFail(errorMessage, fatal);
+            if (!skipTaskStatusPublish) {
+                publishTaskState(TaskState.FAILED);
+                saveAndPublishProcessStatus(taskContext != null ? taskContext.getProcessStatus() : status);
+                saveExperimentError(errorModel);
+                saveProcessError(errorModel);
+                saveTaskError(errorModel);
+            }
+            return onFail(errorMessage, fatal);
+        } else {
+            return onFail("Handover back to helix engine to retry", fatal);
+        }
     }
 
     protected void saveAndPublishProcessStatus(ProcessState state) {
@@ -254,7 +275,7 @@ public abstract class AiravataTask extends AbstractTask {
     }
 
     @SuppressWarnings("WeakerAccess")
-    protected void saveExperimentError(ErrorModel errorModel) {
+    private void saveExperimentError(ErrorModel errorModel) {
         try {
             errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
             getRegistryServiceClient().addErrors("EXPERIMENT_ERROR", errorModel, experimentId);
@@ -265,7 +286,7 @@ public abstract class AiravataTask extends AbstractTask {
     }
 
     @SuppressWarnings("WeakerAccess")
-    protected void saveProcessError(ErrorModel errorModel) {
+    private void saveProcessError(ErrorModel errorModel) {
         try {
             errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
             getRegistryServiceClient().addErrors("PROCESS_ERROR", errorModel, getProcessId());
@@ -275,7 +296,7 @@ public abstract class AiravataTask extends AbstractTask {
     }
 
     @SuppressWarnings("WeakerAccess")
-    protected void saveTaskError(ErrorModel errorModel) {
+    private void saveTaskError(ErrorModel errorModel) {
         try {
             errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
             getRegistryServiceClient().addErrors("TASK_ERROR", errorModel, getTaskId());
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
index 6be4383..d6b00c5 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
@@ -52,7 +52,7 @@ public class EnvSetupTask extends AiravataTask {
             return onSuccess("Envi setup task successfully completed " + getTaskId());
 
         } catch (Exception e) {
-            return onFail("Failed to setup environment of task " + getTaskId(), true, e);
+            return onFail("Failed to setup environment of task " + getTaskId(), false, e);
         }
     }
 
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index fd55d70..fe57776 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -145,7 +145,7 @@ public abstract class DataStagingTask extends AiravataTask {
             }
         } catch (AgentException e) {
             logger.error("Error while checking the file " + sourcePath + " existence");
-            throw new TaskOnFailException("Error while checking the file " + sourcePath + " existence", true, e);
+            throw new TaskOnFailException("Error while checking the file " + sourcePath + " existence", false, e);
         }
 
         String localSourceFilePath = getLocalDataPath(fileName);
@@ -157,7 +157,7 @@ public abstract class DataStagingTask extends AiravataTask {
                 logger.info("Output file downloaded to " + localSourceFilePath);
             } catch (AgentException e) {
                 throw new TaskOnFailException("Failed downloading output file " + sourcePath + " to the local path " +
-                        localSourceFilePath, true, e);
+                        localSourceFilePath, false, e);
             }
 
             File localFile = new File(localSourceFilePath);
@@ -167,7 +167,7 @@ public abstract class DataStagingTask extends AiravataTask {
                     return false;
                 }
             } else {
-                throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, true, null);
+                throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
             }
             // Uploading output file to the storage resource
             try {
@@ -176,7 +176,7 @@ public abstract class DataStagingTask extends AiravataTask {
                 logger.info("Output file uploaded to " + destPath);
             } catch (AgentException e) {
                 throw new TaskOnFailException("Failed uploading the output file to " + destPath + " from local path " +
-                        localSourceFilePath, true, e);
+                        localSourceFilePath, false, e);
             }
 
             return true;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
index 71e6a54..298925d 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
@@ -100,7 +100,7 @@ public class InputDataStagingTask extends DataStagingTask {
                     storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath);
                     logger.info("Input file downloaded to " + localSourceFilePath);
                 } catch (AgentException e) {
-                    throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, true, e);
+                    throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, false, e);
                 }
 
                 File localFile = new File(localSourceFilePath);
@@ -110,7 +110,7 @@ public class InputDataStagingTask extends DataStagingTask {
                         return onFail("Input staging has failed as file " + localSourceFilePath + " size is 0", true, null);
                     }
                 } else {
-                    throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, true, null);
+                    throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
                 }
 
                 // Uploading input file to the compute resource
@@ -119,7 +119,7 @@ public class InputDataStagingTask extends DataStagingTask {
                     adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath());
                     logger.info("Input file uploaded to " + destinationURI.getPath());
                 } catch (AgentException e) {
-                    throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, true, e);
+                    throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, false, e);
                 }
 
             } finally {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 200a073..1f66a1a 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -124,7 +124,7 @@ public class OutputDataStagingTask extends DataStagingTask {
                     }
 
                 } catch (AgentException e) {
-                    throw new TaskOnFailException("Failed to fetch the file list from extension " + sourceFileName, true, e);
+                    throw new TaskOnFailException("Failed to fetch the file list from extension " + sourceFileName, false, e);
                 }
 
                 for (String temp : fileNames) {
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
index f55fd48..0dabe59 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -22,6 +22,7 @@ package org.apache.airavata.helix.core;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.core.participant.HelixParticipant;
+import org.apache.airavata.helix.core.util.MonitoringUtil;
 import org.apache.airavata.helix.core.util.TaskUtil;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
@@ -174,6 +175,15 @@ public abstract class AbstractTask extends UserContentStore implements Task {
         return this;
     }
 
+    protected int getCurrentRetryCount() throws Exception {
+        return MonitoringUtil.getTaskRetryCount(curatorClient, taskId);
+    }
+
+    protected void markNewRetry() throws Exception {
+        MonitoringUtil.increaseTaskRetryCount(curatorClient, taskId);
+    }
+
+
     public int getRetryCount() {
         return retryCount;
     }
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
index e427c16..b772d0a 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
@@ -27,9 +27,32 @@ public class MonitoringUtil {
     private static final String JOB_ID = "/jobId";
     private static final String JOB_NAME = "/jobName";
     private static final String WORKFLOWS = "/workflows";
+    private static final String RETRY = "/retry";
 
     public static final String CANCEL = "cancel";
 
+    public static int getTaskRetryCount(CuratorFramework curatorClient, String taskId) throws Exception {
+        String path = TASK + taskId + RETRY;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            byte[] processBytes = curatorClient.getData().forPath(path);
+            return Integer.parseInt(new String(processBytes));
+        } else {
+            return 1;
+        }
+    }
+
+    public static void increaseTaskRetryCount(CuratorFramework curatorClient, String takId) throws Exception {
+        String path = TASK + takId + RETRY;
+        int currentRetryCount = 2;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            byte[] processBytes = curatorClient.getData().forPath(path);
+            currentRetryCount = Integer.parseInt(new String(processBytes)) + 1;
+            curatorClient.delete().forPath(path);
+        }
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                path , (currentRetryCount + "").getBytes());
+    }
+
     public static String getExperimentIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
         String path = MONITORING + jobId + EXPERIMENT;
         if (curatorClient.checkExists().forPath(path) != null) {