You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/08/24 01:51:24 UTC
[airavata] branch staging updated: Optimizing task retry logic
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push:
new c905ef5 Optimizing task retry logic
c905ef5 is described below
commit c905ef59739bb9ad765457ca9baa04e6d09f882e
Author: dimuthu <di...@gmail.com>
AuthorDate: Thu Aug 23 21:51:16 2018 -0400
Optimizing task retry logic
---
.../airavata/helix/impl/task/AiravataTask.java | 83 ++++++++++++++--------
.../airavata/helix/impl/task/env/EnvSetupTask.java | 2 +-
.../helix/impl/task/staging/DataStagingTask.java | 8 +--
.../impl/task/staging/InputDataStagingTask.java | 6 +-
.../impl/task/staging/OutputDataStagingTask.java | 2 +-
.../apache/airavata/helix/core/AbstractTask.java | 10 +++
.../airavata/helix/core/util/MonitoringUtil.java | 23 ++++++
7 files changed, 94 insertions(+), 40 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index fa423bc..d2e03d8 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -90,42 +90,63 @@ public abstract class AiravataTask extends AbstractTask {
}
protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
+ int currentRetryCount = 0;
+ try {
+ currentRetryCount = getCurrentRetryCount();
+ } catch (Exception e) {
+ logger.error("Failed to obtain current retry count. So failing the task permanently", e);
+ fatal = true;
+ }
- ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
- StringWriter errors = new StringWriter();
+ logger.warn("Task failed with fatal = " + fatal + ". Current retry count " + currentRetryCount);
- String errorCode = UUID.randomUUID().toString();
- String errorMessage = "Error Code : " + errorCode + ", Task " + getTaskId() + " failed due to " + reason +
- (error == null ? "" : ", " + error.getMessage());
+ if (currentRetryCount < getRetryCount() && !fatal) {
+ try {
+ markNewRetry();
+ } catch (Exception e) {
+ logger.error("Failed to mark retry. So failing the task permanently", e);
+ fatal = true;
+ }
+ }
- // wrapping from new error object with error code
- error = new TaskOnFailException(errorMessage, true, error);
+ if (currentRetryCount >= getRetryCount() || fatal) {
+ ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
+ StringWriter errors = new StringWriter();
- status.setReason(errorMessage);
- errors.write(ExceptionUtils.getStackTrace(error));
- logger.error(errorMessage, error);
+ String errorCode = UUID.randomUUID().toString();
+ String errorMessage = "Error Code : " + errorCode + ", Task " + getTaskId() + " failed due to " + reason +
+ (error == null ? "" : ", " + error.getMessage());
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- if (getTaskContext() != null) { // task context could be null if the initialization failed
- getTaskContext().setProcessStatus(status);
- } else {
- logger.warn("Task context is null. So can not store the process status in the context");
- }
+ // wrapping from new error object with error code
+ error = new TaskOnFailException(errorMessage, true, error);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage(reason);
- errorModel.setActualErrorMessage(errors.toString());
- errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ status.setReason(errorMessage);
+ errors.write(ExceptionUtils.getStackTrace(error));
+ logger.error(errorMessage, error);
- if (!skipTaskStatusPublish) {
- publishTaskState(TaskState.FAILED);
- saveAndPublishProcessStatus(taskContext != null ? taskContext.getProcessStatus() : status);
- saveExperimentError(errorModel);
- saveProcessError(errorModel);
- saveTaskError(errorModel);
- }
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ if (getTaskContext() != null) { // task context could be null if the initialization failed
+ getTaskContext().setProcessStatus(status);
+ } else {
+ logger.warn("Task context is null. So can not store the process status in the context");
+ }
+
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage(reason);
+ errorModel.setActualErrorMessage(errors.toString());
+ errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- return onFail(errorMessage, fatal);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.FAILED);
+ saveAndPublishProcessStatus(taskContext != null ? taskContext.getProcessStatus() : status);
+ saveExperimentError(errorModel);
+ saveProcessError(errorModel);
+ saveTaskError(errorModel);
+ }
+ return onFail(errorMessage, fatal);
+ } else {
+ return onFail("Handover back to helix engine to retry", fatal);
+ }
}
protected void saveAndPublishProcessStatus(ProcessState state) {
@@ -254,7 +275,7 @@ public abstract class AiravataTask extends AbstractTask {
}
@SuppressWarnings("WeakerAccess")
- protected void saveExperimentError(ErrorModel errorModel) {
+ private void saveExperimentError(ErrorModel errorModel) {
try {
errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
getRegistryServiceClient().addErrors("EXPERIMENT_ERROR", errorModel, experimentId);
@@ -265,7 +286,7 @@ public abstract class AiravataTask extends AbstractTask {
}
@SuppressWarnings("WeakerAccess")
- protected void saveProcessError(ErrorModel errorModel) {
+ private void saveProcessError(ErrorModel errorModel) {
try {
errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
getRegistryServiceClient().addErrors("PROCESS_ERROR", errorModel, getProcessId());
@@ -275,7 +296,7 @@ public abstract class AiravataTask extends AbstractTask {
}
@SuppressWarnings("WeakerAccess")
- protected void saveTaskError(ErrorModel errorModel) {
+ private void saveTaskError(ErrorModel errorModel) {
try {
errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
getRegistryServiceClient().addErrors("TASK_ERROR", errorModel, getTaskId());
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
index 6be4383..d6b00c5 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
@@ -52,7 +52,7 @@ public class EnvSetupTask extends AiravataTask {
return onSuccess("Envi setup task successfully completed " + getTaskId());
} catch (Exception e) {
- return onFail("Failed to setup environment of task " + getTaskId(), true, e);
+ return onFail("Failed to setup environment of task " + getTaskId(), false, e);
}
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index fd55d70..fe57776 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -145,7 +145,7 @@ public abstract class DataStagingTask extends AiravataTask {
}
} catch (AgentException e) {
logger.error("Error while checking the file " + sourcePath + " existence");
- throw new TaskOnFailException("Error while checking the file " + sourcePath + " existence", true, e);
+ throw new TaskOnFailException("Error while checking the file " + sourcePath + " existence", false, e);
}
String localSourceFilePath = getLocalDataPath(fileName);
@@ -157,7 +157,7 @@ public abstract class DataStagingTask extends AiravataTask {
logger.info("Output file downloaded to " + localSourceFilePath);
} catch (AgentException e) {
throw new TaskOnFailException("Failed downloading output file " + sourcePath + " to the local path " +
- localSourceFilePath, true, e);
+ localSourceFilePath, false, e);
}
File localFile = new File(localSourceFilePath);
@@ -167,7 +167,7 @@ public abstract class DataStagingTask extends AiravataTask {
return false;
}
} else {
- throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, true, null);
+ throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
}
// Uploading output file to the storage resource
try {
@@ -176,7 +176,7 @@ public abstract class DataStagingTask extends AiravataTask {
logger.info("Output file uploaded to " + destPath);
} catch (AgentException e) {
throw new TaskOnFailException("Failed uploading the output file to " + destPath + " from local path " +
- localSourceFilePath, true, e);
+ localSourceFilePath, false, e);
}
return true;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
index 71e6a54..298925d 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
@@ -100,7 +100,7 @@ public class InputDataStagingTask extends DataStagingTask {
storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath);
logger.info("Input file downloaded to " + localSourceFilePath);
} catch (AgentException e) {
- throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, true, e);
+ throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, false, e);
}
File localFile = new File(localSourceFilePath);
@@ -110,7 +110,7 @@ public class InputDataStagingTask extends DataStagingTask {
return onFail("Input staging has failed as file " + localSourceFilePath + " size is 0", true, null);
}
} else {
- throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, true, null);
+ throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
}
// Uploading input file to the compute resource
@@ -119,7 +119,7 @@ public class InputDataStagingTask extends DataStagingTask {
adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath());
logger.info("Input file uploaded to " + destinationURI.getPath());
} catch (AgentException e) {
- throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, true, e);
+ throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, false, e);
}
} finally {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 200a073..1f66a1a 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -124,7 +124,7 @@ public class OutputDataStagingTask extends DataStagingTask {
}
} catch (AgentException e) {
- throw new TaskOnFailException("Failed to fetch the file list from extension " + sourceFileName, true, e);
+ throw new TaskOnFailException("Failed to fetch the file list from extension " + sourceFileName, false, e);
}
for (String temp : fileNames) {
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
index f55fd48..0dabe59 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -22,6 +22,7 @@ package org.apache.airavata.helix.core;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.core.participant.HelixParticipant;
+import org.apache.airavata.helix.core.util.MonitoringUtil;
import org.apache.airavata.helix.core.util.TaskUtil;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
@@ -174,6 +175,15 @@ public abstract class AbstractTask extends UserContentStore implements Task {
return this;
}
+ protected int getCurrentRetryCount() throws Exception {
+ return MonitoringUtil.getTaskRetryCount(curatorClient, taskId);
+ }
+
+ protected void markNewRetry() throws Exception {
+ MonitoringUtil.increaseTaskRetryCount(curatorClient, taskId);
+ }
+
+
public int getRetryCount() {
return retryCount;
}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
index e427c16..b772d0a 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
@@ -27,9 +27,32 @@ public class MonitoringUtil {
private static final String JOB_ID = "/jobId";
private static final String JOB_NAME = "/jobName";
private static final String WORKFLOWS = "/workflows";
+ private static final String RETRY = "/retry";
public static final String CANCEL = "cancel";
+ public static int getTaskRetryCount(CuratorFramework curatorClient, String taskId) throws Exception {
+ String path = TASK + taskId + RETRY;
+ if (curatorClient.checkExists().forPath(path) != null) {
+ byte[] processBytes = curatorClient.getData().forPath(path);
+ return Integer.parseInt(new String(processBytes));
+ } else {
+ return 1;
+ }
+ }
+
+ public static void increaseTaskRetryCount(CuratorFramework curatorClient, String takId) throws Exception {
+ String path = TASK + takId + RETRY;
+ int currentRetryCount = 2;
+ if (curatorClient.checkExists().forPath(path) != null) {
+ byte[] processBytes = curatorClient.getData().forPath(path);
+ currentRetryCount = Integer.parseInt(new String(processBytes)) + 1;
+ curatorClient.delete().forPath(path);
+ }
+ curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ path , (currentRetryCount + "").getBytes());
+ }
+
public static String getExperimentIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
String path = MONITORING + jobId + EXPERIMENT;
if (curatorClient.checkExists().forPath(path) != null) {