You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/01/13 18:03:07 UTC
[30/42] airavata git commit: Fixing issues with cancelation with
recovery
Fixing issues with cancelation with recovery
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6d338d07
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6d338d07
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6d338d07
Branch: refs/heads/develop
Commit: 6d338d07f039bcc30de2d9324a8a37f268ea3b2b
Parents: 0b75afd
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Thu Jan 7 18:32:18 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Thu Jan 7 18:32:18 2016 -0500
----------------------------------------------------------------------
.../gfac/core/context/ProcessContext.java | 9 +++
.../airavata/gfac/impl/GFacEngineImpl.java | 75 +++++++++++++++++++-
.../airavata/gfac/server/GfacServerHandler.java | 50 +++++++++----
.../server/OrchestratorServerHandler.java | 15 ++--
4 files changed, 131 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 02d4cbc..2880551 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -97,6 +97,7 @@ public class ProcessContext {
private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
private boolean acknowledge;
private SSHKeyAuthentication sshKeyAuthentication;
+ private boolean recoveryWithCancel = false;
/**
* Note: process context property use lazy loading approach. In runtime you will see some properties as null
@@ -501,5 +502,13 @@ public class ProcessContext {
public void setSshKeyAuthentication(SSHKeyAuthentication sshKeyAuthentication) {
this.sshKeyAuthentication = sshKeyAuthentication;
}
+
+ public boolean isRecoveryWithCancel() {
+ return recoveryWithCancel;
+ }
+
+ public void setRecoveryWithCancel(boolean recoveryWithCancel) {
+ this.recoveryWithCancel = recoveryWithCancel;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index dd10c12..00d920d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
@@ -61,6 +62,9 @@ import org.apache.airavata.registry.cpi.ExpCatChildDataType;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.utils.Constants;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +95,14 @@ public class GFacEngineImpl implements GFacEngine {
ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
processContext.setProcessModel(processModel);
+
+ try {
+ checkRecoveryWithCancel(processContext);
+ } catch (Exception e) {
+ log.error("expId: {}, processId: {}, Error while checking process cancel data in zookeeper",
+ processContext.getExperimentId(), processContext.getProcessId());
+ }
+
GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
processContext.setGatewayResourceProfile(gatewayProfile);
processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference
@@ -176,6 +188,19 @@ public class GFacEngineImpl implements GFacEngine {
}
}
+ private void checkRecoveryWithCancel(ProcessContext processContext) throws Exception {
+ CuratorFramework curatorClient = processContext.getCuratorClient();
+ String experimentId = processContext.getExperimentId();
+ String processId = processContext.getProcessId();
+ String processCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath(
+ ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), processId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+ log.info("expId: {}, processId: {}, get process cancel data from zookeeper node {}", experimentId, processId, processCancelNodePath);
+ byte[] bytes = curatorClient.getData().forPath(processCancelNodePath);
+ if (bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
+ processContext.setRecoveryWithCancel(true);
+ }
+ }
+
@Override
public void executeProcess(ProcessContext processContext) throws GFacException {
if (processContext.isInterrupted()) {
@@ -509,6 +534,7 @@ public class GFacEngineImpl implements GFacEngine {
processContext.setTaskExecutionOrder(taskExecutionOrder);
Map<String, TaskModel> taskMap = processContext.getTaskMap();
String recoverTaskId = null;
+ String previousTaskId = null;
TaskModel taskModel = null;
for (String taskId : taskExecutionOrder) {
taskModel = taskMap.get(taskId);
@@ -517,9 +543,56 @@ public class GFacEngineImpl implements GFacEngine {
recoverTaskId = taskId;
break;
}
+ previousTaskId = taskId;
+ }
+ final String rTaskId = recoverTaskId;
+ final String pTaskId = previousTaskId;
+ if (recoverTaskId != null) {
+ if (processContext.isRecoveryWithCancel()) {
+ cancelJobSubmission(processContext, rTaskId, pTaskId);
+ }
+ continueProcess(processContext, recoverTaskId);
}
+ }
- continueProcess(processContext, recoverTaskId);
+ private void cancelJobSubmission(ProcessContext processContext, String rTaskId, String pTaskId) {
+ new Thread(() -> {
+ try {
+ processContext.setCancel(true);
+ ProcessState processState = processContext.getProcessState();
+ List<Object> jobModels = null;
+ switch (processState) {
+ case EXECUTING:
+ jobModels = processContext.getExperimentCatalog().get(
+ ExperimentCatalogModelType.JOB, Constants.FieldConstants.TaskConstants.TASK_ID,
+ rTaskId);
+ break;
+ case MONITORING:
+ if (pTaskId != null) {
+ jobModels = processContext.getExperimentCatalog().get(
+ ExperimentCatalogModelType.JOB, Constants.FieldConstants.TaskConstants.TASK_ID,
+ pTaskId);
+ }
+ }
+
+ if (jobModels != null && !jobModels.isEmpty()) {
+ JobModel jobModel = (JobModel) jobModels.get(jobModels.size() - 1);
+ processContext.setJobModel(jobModel);
+ log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(),
+ processContext.getProcessId(), jobModel.getJobId());
+ cancelProcess(processContext);
+ log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(),
+ processContext.getProcessId(), jobModel.getJobId());
+ }
+ } catch (GFacException e) {
+ log.error("expId: {}, processId: {}, Error while canceling process which is in recovery mode",
+ processContext.getExperimentId(), processContext.getProcessId());
+ } catch (RegistryException e) {
+ log.error("expId: {}, processId: {}, Error while getting job model for taskId {}, " +
+ "couldn't cancel process which is in recovery mode", processContext.getExperimentId(),
+ processContext.getProcessId(), rTaskId);
+ }
+ }).start();
}
private JobModel getJobModel(ProcessContext processContext) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 10da052..c59e199 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -28,6 +28,7 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.gfac.core.GFac;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.cpi.GfacService;
@@ -247,14 +248,29 @@ public class GfacServerHandler implements GfacService.Iface {
.getProcessId());
publishProcessStatus(event, status);
try {
- createProcessZKNode(curatorClient, gfacServerName, event, message);
+ createProcessZKNode(curatorClient, gfacServerName, event, message);
boolean isCancel = setCancelWatcher(curatorClient, event.getExperimentId(), event.getProcessId());
- submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
if (isCancel) {
- // Need to trigger process cancel watcher, wait till process recover and then set zk data.
- Thread.sleep(10000);
- setCancelData(event.getExperimentId());
+ if (status.getState() == ProcessState.STARTED) {
+ status.setState(ProcessState.CANCELLING);
+ status.setReason("Process Cancel is triggered");
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
+ publishProcessStatus(event, status);
+
+ // do cancel operation here
+
+ status.setState(ProcessState.CANCELED);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
+ publishProcessStatus(event, status);
+ rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+ return;
+ } else {
+ setCancelData(event.getExperimentId(),event.getProcessId());
+ }
}
+ submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
} catch (Exception e) {
log.error(e.getMessage(), e);
rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
@@ -295,10 +311,11 @@ public class GfacServerHandler implements GfacService.Iface {
}
}
- private void setCancelData(String experimentId) throws Exception {
- String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
- experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
- curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
+ private void setCancelData(String experimentId, String processId) throws Exception {
+ String processCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath(
+ ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), processId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+ log.info("expId: {}, processId: {}, set process cancel data to zookeeper node {}", experimentId, processId, processCancelNodePath);
+ curatorClient.setData().withVersion(-1).forPath(processCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
.getBytes());
}
@@ -307,10 +324,15 @@ public class GfacServerHandler implements GfacService.Iface {
String processId) throws Exception {
String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId);
- // create /experiments/{experimentId}/cancel node and set watcher for data changes
+ // /experiments/{experimentId}/cancelListener, set watcher for data changes
String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
- byte[] bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode);
- return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST);
+ byte[] bytes = curatorClient.getData().forPath(experimentCancelNode);
+ if (bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
+ return true;
+ } else {
+ bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode);
+ return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST);
+ }
}
@@ -339,6 +361,10 @@ public class GfacServerHandler implements GfacService.Iface {
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher(experimentId, processId)).forPath(zkProcessNodePath);
+ // create /experiments//{experimentId}{processId}/cancelListener
+ String zkProcessCancelPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessCancelPath);
+
// create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag
String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath);
http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 9e39c47..2cda709 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -408,16 +408,21 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private boolean launchSingleAppExperiment() throws TException {
try {
- List<String> processIds = experimentCatalog.getIds(ExperimentCatalogModelType.PROCESS, AbstractExpCatResource.ProcessConstants.EXPERIMENT_ID, experimentId);
+ List<String> processIds = experimentCatalog.getIds(ExperimentCatalogModelType.PROCESS,
+ AbstractExpCatResource.ProcessConstants.EXPERIMENT_ID, experimentId);
for (String processId : processIds) {
launchProcess(processId, airavataCredStoreToken, gatewayId);
}
-
- } catch (Exception e) {
+ ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
+ status.setReason("submitted all processes");
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ OrchestratorUtils.updageExperimentStatus(experimentId, status);
+ log.info("expId: {}, Launched experiment ", experimentId);
+ } catch (Exception e) {
ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED);
status.setReason("Error while updating task status");
OrchestratorUtils.updageExperimentStatus(experimentId, status);
- log.error(experimentId, "Error while updating task status, hence updated experiment status to " +
+ log.error("expId: " + experimentId + ", Error while updating task status, hence updated experiment status to " +
ExperimentState.FAILED, e);
throw new TException(e);
}
@@ -465,7 +470,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
.getExperimentId());
if (stat.getState() == ExperimentState.CANCELING) {
status.setState(ExperimentState.CANCELING);
- status.setReason("Process competed but experiment cancelling is triggered");
+ status.setReason("Process started but experiment cancelling is triggered");
} else {
status.setState(ExperimentState.EXECUTING);
status.setReason("process started");