You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sc...@apache.org on 2016/01/12 23:04:34 UTC

[23/50] [abbrv] airavata git commit: Fixed AIRAVATA-1888

Fixed AIRAVATA-1888


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5ce750d5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5ce750d5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5ce750d5

Branch: refs/heads/data-manager
Commit: 5ce750d53bc4e8def467b9024c491a463e62203d
Parents: fc3f979
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Jan 6 14:15:35 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Jan 6 14:15:35 2016 -0500

----------------------------------------------------------------------
 .../airavata/gfac/impl/GFacEngineImpl.java      |  3 +-
 .../apache/airavata/gfac/impl/GFacWorker.java   | 32 +++-----------
 .../airavata/gfac/server/GfacServerHandler.java | 44 +++++++++++++-------
 3 files changed, 38 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 9e6a522..dd10c12 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -509,8 +509,9 @@ public class GFacEngineImpl implements GFacEngine {
         processContext.setTaskExecutionOrder(taskExecutionOrder);
         Map<String, TaskModel> taskMap = processContext.getTaskMap();
         String recoverTaskId = null;
+        TaskModel taskModel = null;
         for (String taskId : taskExecutionOrder) {
-            TaskModel taskModel = taskMap.get(taskId);
+            taskModel = taskMap.get(taskId);
             TaskState state = taskModel.getTaskStatus().getState();
             if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
                 recoverTaskId = taskId;

http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index c701ed5..e0664a5 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -78,14 +78,6 @@ public class GFacWorker implements Runnable {
 	@Override
 	public void run() {
 		try {
-			if (processContext.isInterrupted()) {
-				GFacUtils.handleProcessInterrupt(processContext);
-				if (processContext.isCancel()) {
-					sendAck();
-					Factory.getGfacContext().removeProcess(processContext.getProcessId());
-				}
-				return;
-			}
 			ProcessState processState = processContext.getProcessStatus().getState();
 			switch (processState) {
 				case CREATED:
@@ -206,7 +198,13 @@ public class GFacWorker implements Runnable {
     }
 
 	private void executeProcess() throws GFacException {
+		// checkpoint
+		if (processContext.isInterrupted()) {
+			return;
+		}
+
 		engine.executeProcess(processContext);
+		// checkpoint
 		if (processContext.isInterrupted()) {
 			return;
 		}
@@ -216,24 +214,6 @@ public class GFacWorker implements Runnable {
         }
 	}
 
-//	private void monitorProcess() throws GFacException {
-//		try {
-//			JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
-//			if (monitorService != null) {
-//				monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
-//                ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
-//                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-//                processContext.setProcessStatus(status);
-//				GFacUtils.saveAndPublishProcessStatus(processContext);
-//			} else {
-//				// we directly invoke outflow
-//				continueTaskExecution();
-//			}
-//		} catch (AiravataException e) {
-//			throw new GFacException("Error while retrieving moniot service", e);
-//		}
-//	}
-
 	private void sendAck() {
 		// this ensure, gfac doesn't send ack more than once for a process. which cause to remove gfac rabbitmq consumer from rabbitmq server.
 		if (!processContext.isAcknowledge()) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c1c08a5..10da052 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -28,10 +28,8 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpiConstants;
 import org.apache.airavata.gfac.impl.Factory;
@@ -250,7 +248,13 @@ public class GfacServerHandler implements GfacService.Iface {
 	                publishProcessStatus(event, status);
                     try {
 	                    createProcessZKNode(curatorClient, gfacServerName, event, message);
-	                    submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
+                        boolean isCancel = setCancelWatcher(curatorClient, event.getExperimentId(), event.getProcessId());
+                        submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
+                        if (isCancel) {
+                            // Need to trigger process cancel watcher, wait till process recover and then set zk data.
+                            Thread.sleep(10000);
+                            setCancelData(event.getExperimentId());
+                        }
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
                         rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
@@ -262,7 +266,9 @@ public class GfacServerHandler implements GfacService.Iface {
                 } catch (AiravataException e) {
 	                log.error("Error while publishing process status", e);
                 }
-            } else if (message.getType().equals(MessageType.TERMINATEPROCESS)) {
+            }
+            // TODO - Now there is no process termination type messages, use zookeeper instead of rabbitmq to do that. it is safe to remove this else part.
+            else if (message.getType().equals(MessageType.TERMINATEPROCESS)) {
                 ProcessTerminateEvent event = new ProcessTerminateEvent();
                 TBase messageEvent = message.getEvent();
                 try {
@@ -289,7 +295,26 @@ public class GfacServerHandler implements GfacService.Iface {
         }
     }
 
-	private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException {
+    private void setCancelData(String experimentId) throws Exception {
+        String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+                experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+        curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
+                .getBytes());
+    }
+
+    private boolean setCancelWatcher(CuratorFramework curatorClient,
+                                     String experimentId,
+                                     String processId) throws Exception {
+
+        String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId);
+        // create /experiments/{experimentId}/cancel node and set watcher for data changes
+        String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+        byte[] bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode);
+        return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST);
+
+    }
+
+    private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException {
 		ProcessIdentifier identifier = new ProcessIdentifier(event.getProcessId(),
 				event.getExperimentId(),
 				event.getGatewayId());
@@ -324,15 +349,6 @@ public class GfacServerHandler implements GfacService.Iface {
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath);
 		curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes());
 
-		// create /experiments/{experimentId}/{processId}/cancelListener node and set watcher for data changes
-/*		String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode);
-		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);*/
-
-		// create /experiments/{experimentId}/cancel node and set watcher for data changes
-		String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath (experimentCancelNode);
-
 	}
 
 	private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, ProcessSubmitEvent event,