You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/09/04 01:57:36 UTC

[1/2] airavata git commit: Experiment cancel request, Orchestrator side implementation and refactored zookeeper node paths. AIRAVATA-1798

Repository: airavata
Updated Branches:
  refs/heads/master 420a031a1 -> be9af52e2


Experiment cancel request, Orchestrator side implementation and refactored zookeeper node paths. AIRAVATA-1798


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e0483f37
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e0483f37
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e0483f37

Branch: refs/heads/master
Commit: e0483f37fc301d459212736d9b57adc3169666ba
Parents: 6623967
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Sep 3 19:57:17 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Sep 3 19:57:17 2015 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/zkConstants.java      |  32 ++++
 .../airavata/gfac/core/GFacConstants.java       |   8 +-
 .../apache/airavata/gfac/core/GFacUtils.java    |  15 +-
 .../impl/watcher/CancelRequestWatcherImpl.java  |   8 +-
 .../watcher/RedeliveryRequestWatcherImpl.java   |   2 +
 .../airavata/gfac/server/GfacServerHandler.java |  67 +++++---
 .../server/OrchestratorServerHandler.java       | 151 ++++---------------
 7 files changed, 122 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
new file mode 100644
index 0000000..9255e02
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.utils;
+
+public interface ZkConstants {
+
+	public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
+	public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
+	public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
+	public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
+	public static final String ZOOKEEPER_TOKEN_NODE = "/token";
+	public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener";
+	public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
index b662fff..444956b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
@@ -50,13 +50,7 @@ public class GFacConstants {
 	public static final String _127_0_0_1 = "127.0.0.1";
 	public static final String LOCALHOST = "localhost";
 
-	public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
-	public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
-	public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
-	public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
-	public static final String ZOOKEEPER_TOKEN_NODE = "/token";
-	public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener";
-	public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
+
 
 	public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
 	public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 784d214..1fca2d0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.DBUtil;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.gfac.core.context.ProcessContext;
@@ -551,7 +552,7 @@ public class GFacUtils {
      * @throws InterruptedException
      */
     public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
-        String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+        String experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
         List<String> children = curatorClient.getChildren().forPath(experimentNode);
         for (String pickedChild : children) {
             String experimentPath = experimentNode + File.separator + pickedChild;
@@ -568,9 +569,9 @@ public class GFacUtils {
 
     public static boolean setExperimentCancelRequest(String processId, CuratorFramework curatorClient, long
 		    deliveryTag) throws Exception {
-	    String experimentNode = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-	    String cancelListenerNodePath = ZKPaths.makePath(experimentNode, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-	    curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, GFacConstants.ZOOKEEPER_CANCEL_REQEUST
+	    String experimentNode = ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+	    String cancelListenerNodePath = ZKPaths.makePath(experimentNode, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+	    curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
 			    .getBytes());
 	    return true;
     }
@@ -768,7 +769,7 @@ public class GFacUtils {
 //    }
 
     public static String getZKGfacServersParentPath() {
-        return ZKPaths.makePath(GFacConstants.ZOOKEEPER_SERVERS_NODE, GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE);
+        return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE);
     }
 
     public static JobDescriptor createJobDescriptor(ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
@@ -1113,11 +1114,11 @@ public class GFacUtils {
     }
 
 	public static String getExperimentNodePath(String experimentId) {
-		return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
+		return ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
 	}
 
 	public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId) throws Exception {
-		String deliveryTagPath = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants
+		String deliveryTagPath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + ZkConstants
 				.ZOOKEEPER_DELIVERYTAG_NODE;
 		byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
 		return GFacUtils.bytesToLong(bytes);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index bfeac89..58d2817 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.airavata.gfac.impl.watcher;
 
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.impl.Factory;
@@ -44,7 +45,7 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
 				byte[] bytes = curatorClient.getData().forPath(path);
 				String processId = path.substring(path.lastIndexOf("/") + 1);
 				String action = new String(bytes);
-				if (action.equalsIgnoreCase("CANCEL")) {
+				if (action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
 					ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
 					if (processContext != null) {
 						processContext.setCancel(true);
@@ -56,9 +57,12 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
 					curatorClient.getData().usingWatcher(this).forPath(path);
 				}
 				break;
+			case NodeDeleted:
+				//end of experiment execution, ignore this event
+				break;
 			case NodeCreated:
 			case NodeChildrenChanged:
-			case NodeDeleted:
+			case None:
 				curatorClient.getData().usingWatcher(this).forPath(path);
 				break;
 			default:

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
index 4738edb..dc5317f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
@@ -58,6 +58,8 @@ public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher {
 				}
 				break;
 			case NodeDeleted:
+				//end of experiment execution, ignore this event
+				break;
 			case NodeCreated:
 			case NodeChildrenChanged:
 			case None:

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9ebfa05..1040b05 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
@@ -54,6 +55,7 @@ import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,7 +118,7 @@ public class GfacServerHandler implements GfacService.Iface {
         airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort();
         // create PERSISTENT nodes
         ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath());
-        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), ZkConstants.ZOOKEEPER_EXPERIMENT_NODE);
         // create EPHEMERAL server name node
         String gfacName = ServerSettings.getGFacServerName();
         if (curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath() ,gfacName)) == null) {
@@ -196,7 +198,7 @@ public class GfacServerHandler implements GfacService.Iface {
         private String gfacServerName;
 
         public ProcessLaunchMessageHandler() throws ApplicationSettingsException {
-            experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+            experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
             gfacServerName = ServerSettings.getGFacServerName();
         }
 
@@ -226,8 +228,7 @@ public class GfacServerHandler implements GfacService.Iface {
 		                if (Factory.getGfacContext().getProcess(event.getProcessId()) != null) {
 			                // update deliver tag
 			                try {
-				                updateDeliveryTag(curatorClient, gfacServerName, event.getProcessId(), message
-						                .getDeliveryTag());
+				                updateDeliveryTag(curatorClient, gfacServerName, event, message );
 				                return;
 			                } catch (Exception e) {
 				                log.error("Error while updating delivery tag for redelivery message , messageId : " +
@@ -254,8 +255,7 @@ public class GfacServerHandler implements GfacService.Iface {
 			                .getProcessId());
 	                publishProcessStatus(event, status);
                     try {
-	                    createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(), message
-			                    .getDeliveryTag(), event.getTokenId());
+	                    createProcessZKNode(curatorClient, gfacServerName, event, message);
 	                    submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
@@ -306,38 +306,55 @@ public class GfacServerHandler implements GfacService.Iface {
 		statusPublisher.publish(msgCtx);
 	}
 
-	private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName, String
-			processId, long deliveryTag, String token) throws Exception {
-		// TODO - To handle multiple processes per experiment, need to create a /experiments/{expId}/{processId} node
-		// create /experiments/{processId} node and set data - serverName, add redelivery listener
-		String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+	private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName,ProcessSubmitEvent event
+			,MessageContext messageContext) throws Exception {
+		String processId  = event.getProcessId();
+		String token = event.getTokenId();
+		String experimentId = event.getExperimentId();
+		long deliveryTag = messageContext.getDeliveryTag();
+
+		// create /experiments//{experimentId}{processId} node and set data - serverName, add redelivery listener
+		String experimentNodePath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + experimentId;
+		String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath);
 		curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
 		curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath);
 
-		// create /experiments/{processId}/deliveryTag node and set data - deliveryTag
-		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+		// create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag
+		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath);
 		curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
 
-		// create /experiments/{processId}/token node and set data - token
-		String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE);
+		// create /experiments/{experimentId}/{processId}/token node and set data - token
+		String tokenNodePath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_TOKEN_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath);
 		curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes());
 
-		// create /experiments/{processId}/cancelListener node and set watcher for data changes
-		String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+		// create /experiments/{experimentId}/{processId}/cancelListener node and set watcher for data changes
+/*		String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode);
-		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);
+		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);*/
+
+		// create /experiments/{experimentId}/cancel node and set watcher for data changes
+		String experimentCancelNode = experimentNodePath + "/" + ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE;
+		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
+		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath (experimentCancelNode);
+
 	}
 
-	private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, String processId, long
-			deliveryTag) throws Exception {
-		// create /experiments/{processId} node and set data - serverName, add redelivery listener
-		String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-		// create /experiments/{processId}/deliveryTag node and set data - deliveryTag
-		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
-		curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
+	private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, ProcessSubmitEvent event,
+	                               MessageContext messageContext) throws Exception {
+		String experimentId = event.getExperimentId();
+		String processId = event.getProcessId();
+		long deliveryTag = messageContext.getDeliveryTag();
+		String processNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+				experimentId), processId);
+		Stat stat = curatorClient.checkExists().forPath(processNodePath);
+		if (stat != null) {
+			// create /experiments/{processId}/deliveryTag node and set data - deliveryTag
+			String deliveryTagPath = ZKPaths.makePath(processNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+			curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 8427d0c..acb5530 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
@@ -59,8 +60,14 @@ import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractRes
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
 import org.apache.airavata.registry.cpi.*;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +87,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private String gatewayName;
 	private Publisher publisher;
 	private RabbitMQStatusConsumer statusConsumer;
+	private CuratorFramework curatorClient;
 
     /**
 	 * Query orchestrator server to fetch the CPI version
@@ -109,6 +117,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 			String exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
 			statusConsumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
 			statusConsumer.listen(new ProcessStatusHandler());
+			startCurator();
 		} catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) {
 			log.error(e.getMessage(), e);
 			throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -209,7 +218,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	 */
 	public boolean terminateExperiment(String experimentId, String tokenId) throws TException {
         log.info(experimentId, "Experiment: {} is cancelling  !!!!!", experimentId);
-        return validateStatesAndCancel(experimentId, tokenId);
+		try {
+			return validateStatesAndCancel(experimentId, tokenId);
+		} catch (Exception e) {
+			log.error("expId : " + experimentId + " :- Error while cancelling experiment", e);
+			return false;
+		}
 	}
 
 	private String getAiravataUserName() {
@@ -277,7 +291,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
         List<ComputeResourceDescription> computeHostList = Arrays.asList(deploymentMap.keySet().toArray(new ComputeResourceDescription[]{}));
         Class<? extends HostScheduler> aClass = Class.forName(
                 ServerSettings.getHostScheduler()).asSubclass(
-                HostScheduler.class);
+		        HostScheduler.class);
         HostScheduler hostScheduler = aClass.newInstance();
         ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList);
         return deploymentMap.get(ComputeResourceDescription);
@@ -297,124 +311,15 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 		return selectedModuleId;
 	}
 
-    private boolean validateStatesAndCancel(String experimentId, String tokenId)throws TException{
-        // FIXME
-//        try {
-//            Experiment experiment = (Experiment) experimentCatalog.get(
-//                    ExperimentCatalogModelType.EXPERIMENT, experimentId);
-//			log.info("Waiting for zookeeper to connect to the server");
-//			synchronized (mutex){
-//				mutex.wait(5000);
-//			}
-//            if (experiment == null) {
-//                log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.", experimentId);
-//                throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId);
-//            }
-//            ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState();
-//            if (isCancelValid(experimentState)){
-//                ExperimentStatus status = new ExperimentStatus();
-//                status.setExperimentState(ExperimentState.CANCELING);
-//                status.setTimeOfStateChange(Calendar.getInstance()
-//                        .getTimeInMillis());
-//                experiment.setExperimentStatus(status);
-//                experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment,
-//                        experimentId);
-//
-//                List<String> ids = experimentCatalog.getIds(
-//                        ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                        WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-//                for (String workflowNodeId : ids) {
-//                    WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) experimentCatalog
-//                            .get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                                    workflowNodeId);
-//                    int value = workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue();
-//                    if ( value> 1 && value < 7) { // we skip the unknown state
-//                        log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot mark as cancelled, because " +
-//                                "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString());
-//                        continue; // this continue is very useful not to process deeper loops if the upper layers have non-cancel states
-//                    } else {
-//                        WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
-//                        workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING);
-//                        workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                .getTimeInMillis());
-//                        workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-//                        experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
-//                                workflowNodeId);
-//                    }
-//                    List<Object> taskDetailList = experimentCatalog.get(
-//                            ExperimentCatalogModelType.TASK_DETAIL,
-//                            TaskDetailConstants.NODE_ID, workflowNodeId);
-//                    for (Object o : taskDetailList) {
-//                        TaskDetails taskDetails = (TaskDetails) o;
-//                        TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
-//                        if (taskStatus.getExecutionState().getValue() > 7 && taskStatus.getExecutionState().getValue()<12) {
-//                            log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark as cancelled, because " +
-//                                    "current task state is " + ((TaskDetails) o).getTaskStatus().getExecutionState().toString());
-//                            continue;// this continue is very useful not to process deeper loops if the upper layers have non-cancel states
-//                        } else {
-//                            taskStatus.setExecutionState(TaskState.CANCELING);
-//                            taskStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                    .getTimeInMillis());
-//                            taskDetails.setTaskStatus(taskStatus);
-//                            experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, o,
-//                                    taskDetails.getTaskID());
-//                        }
-//                        orchestrator.cancelExperiment(experiment,
-//                                workflowNodeDetail, taskDetails, tokenId);
-//                        // Status update should be done at the monitor
-//                    }
-//                }
-//            }else {
-//                if (isCancelAllowed(experimentState)){
-//                    // when experiment status is < 3 no jobDetails object is created,
-//                    // so we don't have to worry, we simply have to change the status and stop the execution
-//                    ExperimentStatus status = new ExperimentStatus();
-//                    status.setExperimentState(ExperimentState.CANCELED);
-//                    status.setTimeOfStateChange(Calendar.getInstance()
-//                            .getTimeInMillis());
-//                    experiment.setExperimentStatus(status);
-//                    experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment,
-//                            experimentId);
-//                    List<String> ids = experimentCatalog.getIds(
-//                            ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                            WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-//                    for (String workflowNodeId : ids) {
-//                        WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) experimentCatalog
-//                                .get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                                        workflowNodeId);
-//                        WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
-//                        workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
-//                        workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                .getTimeInMillis());
-//                        workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-//                        experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
-//                                workflowNodeId);
-//                        List<Object> taskDetailList = experimentCatalog.get(
-//                                ExperimentCatalogModelType.TASK_DETAIL,
-//                                TaskDetailConstants.NODE_ID, workflowNodeId);
-//                        for (Object o : taskDetailList) {
-//                            TaskDetails taskDetails = (TaskDetails) o;
-//                            TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
-//                            taskStatus.setExecutionState(TaskState.CANCELED);
-//                            taskStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                    .getTimeInMillis());
-//                            taskDetails.setTaskStatus(taskStatus);
-//                            experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, o,
-//                                    taskDetails);
-//                        }
-//                    }
-//                }else {
-//                    log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.",
-//                            experiment.getExperimentStatus().getExperimentState().toString(), experimentId);
-//                    throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: "
-//                            + experiment.getExperimentStatus().getExperimentState().toString());
-//                }
-//            }
-//            log.info("Experiment: " + experimentId + " is cancelled !!!!!");
-//        } catch (Exception e) {
-//            throw new TException(e);
-//        }
-        return true;
+    private boolean validateStatesAndCancel(String experimentId, String tokenId) throws Exception {
+	    String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+			    experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+	    Stat stat = curatorClient.checkExists().forPath(expCancelNodePath);
+	    if (stat != null) {
+		    curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
+				    .getBytes());
+	    }
+	    return true;
     }
 
     private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
@@ -427,6 +332,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 //        }
     }
 
+	private void startCurator() throws ApplicationSettingsException {
+		String connectionSting = ServerSettings.getZookeeperConnection();
+		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+		curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+		curatorClient.start();
+	}
     private class SingleAppExperimentRunner implements Runnable {
 
         String experimentId;


[2/2] airavata git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata

Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/be9af52e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/be9af52e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/be9af52e

Branch: refs/heads/master
Commit: be9af52e246ba5f91fba83d4b95bc21600a490d0
Parents: e0483f3 420a031
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Sep 3 19:57:26 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Sep 3 19:57:26 2015 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   |   3 +-
 .../java/org/apache/airavata/api/Airavata.java  | 141 ++-----------------
 .../main/resources/lib/airavata/Airavata.cpp    |  35 +----
 .../src/main/resources/lib/airavata/Airavata.h  |  32 ++---
 .../lib/airavata/Airavata_server.skeleton.cpp   |   4 +-
 .../resources/lib/Airavata/API/Airavata.php     |  42 +-----
 .../lib/apache/airavata/api/Airavata-remote     |   8 +-
 .../lib/apache/airavata/api/Airavata.py         |  39 +----
 .../client/samples/CreateLaunchBES.java         |   2 +-
 .../client/samples/CreateLaunchExperiment.java  |   2 +-
 .../samples/CreateLaunchExperimentUS3.java      |   4 +-
 .../tools/RegisterOGCEUS3Application.java       |  20 +--
 .../tools/RegisterSampleApplications.java       |  14 +-
 .../client/tools/RegisterUS3Application.java    |  20 +--
 .../org/apache/airavata/model/Workflow.java     |   2 +-
 .../ApplicationDeploymentDescription.java       |   2 +-
 .../appdeployment/ApplicationModule.java        |   2 +-
 .../appcatalog/appdeployment/SetEnvPaths.java   |   2 +-
 .../ApplicationInterfaceDescription.java        |   2 +-
 .../appcatalog/computeresource/BatchQueue.java  |   2 +-
 .../computeresource/CloudJobSubmission.java     |   2 +-
 .../ComputeResourceDescription.java             |   2 +-
 .../computeresource/DataMovementInterface.java  |   2 +-
 .../computeresource/GlobusJobSubmission.java    |   2 +-
 .../computeresource/GridFTPDataMovement.java    |   2 +-
 .../computeresource/JobSubmissionInterface.java |   2 +-
 .../computeresource/LOCALDataMovement.java      |   2 +-
 .../computeresource/LOCALSubmission.java        |   2 +-
 .../computeresource/ResourceJobManager.java     |   2 +-
 .../computeresource/SCPDataMovement.java        |   2 +-
 .../computeresource/SSHJobSubmission.java       |   2 +-
 .../computeresource/UnicoreDataMovement.java    |   2 +-
 .../computeresource/UnicoreJobSubmission.java   |   2 +-
 .../ComputeResourcePreference.java              |   2 +-
 .../gatewayprofile/GatewayResourceProfile.java  |   2 +-
 .../application/io/InputDataObjectType.java     |   2 +-
 .../application/io/OutputDataObjectType.java    |   2 +-
 .../airavata/model/commons/ErrorModel.java      |   2 +-
 .../model/commons/ValidationResults.java        |   2 +-
 .../airavata/model/commons/ValidatorResult.java |   2 +-
 .../model/error/AiravataClientException.java    |   2 +-
 .../model/error/AiravataSystemException.java    |   2 +-
 .../model/error/AuthenticationException.java    |   2 +-
 .../model/error/AuthorizationException.java     |   2 +-
 .../error/ExperimentNotFoundException.java      |   2 +-
 .../model/error/InvalidRequestException.java    |   2 +-
 .../model/error/LaunchValidationException.java  |   2 +-
 .../model/error/ProjectNotFoundException.java   |   2 +-
 .../airavata/model/error/TimedOutException.java |   2 +-
 .../airavata/model/error/ValidationResults.java |   2 +-
 .../airavata/model/error/ValidatorResult.java   |   2 +-
 .../model/experiment/ExperimentModel.java       |   2 +-
 .../model/experiment/ExperimentStatistics.java  |   2 +-
 .../experiment/ExperimentSummaryModel.java      |   2 +-
 .../experiment/UserConfigurationDataModel.java  |   2 +-
 .../org/apache/airavata/model/job/JobModel.java |   2 +-
 .../event/ExperimentStatusChangeEvent.java      |   2 +-
 .../model/messaging/event/JobIdentifier.java    |   2 +-
 .../messaging/event/JobStatusChangeEvent.java   |   2 +-
 .../event/JobStatusChangeRequestEvent.java      |   2 +-
 .../airavata/model/messaging/event/Message.java |   2 +-
 .../messaging/event/ProcessIdentifier.java      |   2 +-
 .../event/ProcessStatusChangeEvent.java         |   2 +-
 .../event/ProcessStatusChangeRequestEvent.java  |   2 +-
 .../messaging/event/ProcessSubmitEvent.java     |   2 +-
 .../messaging/event/ProcessTerminateEvent.java  |   2 +-
 .../model/messaging/event/TaskIdentifier.java   |   2 +-
 .../messaging/event/TaskOutputChangeEvent.java  |   2 +-
 .../messaging/event/TaskStatusChangeEvent.java  |   2 +-
 .../event/TaskStatusChangeRequestEvent.java     |   2 +-
 .../airavata/model/process/ProcessModel.java    |   2 +-
 .../ComputationalResourceSchedulingModel.java   |   2 +-
 .../airavata/model/security/AuthzToken.java     |   2 +-
 .../airavata/model/status/ExperimentStatus.java |   2 +-
 .../apache/airavata/model/status/JobStatus.java |   2 +-
 .../airavata/model/status/ProcessStatus.java    |   2 +-
 .../airavata/model/status/TaskStatus.java       |   2 +-
 .../model/task/DataStagingTaskModel.java        |   2 +-
 .../apache/airavata/model/task/TaskModel.java   |   2 +-
 .../airavata/model/workspace/Gateway.java       |   2 +-
 .../apache/airavata/model/workspace/Group.java  |   2 +-
 .../airavata/model/workspace/Project.java       |   2 +-
 .../apache/airavata/model/workspace/User.java   |   2 +-
 .../apache/airavata/gfac/core/GFacUtils.java    |  84 +++++++++++
 .../airavata/gfac/core/context/TaskContext.java |  20 +++
 .../airavata/gfac/impl/GFacEngineImpl.java      |   2 +
 .../impl/task/AdvancedSCPDataStageTask.java     |  23 ++-
 .../catalog/impl/ExperimentRegistry.java        |   2 +-
 .../airavata-api/airavata_api.thrift            |   2 +-
 89 files changed, 266 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/be9af52e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------