You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/09/04 01:57:36 UTC
[1/2] airavata git commit: Experiment cancel request,
Orchestrator side implementation and refactored zookeeper node paths.
AIRAVATA-1798
Repository: airavata
Updated Branches:
refs/heads/master 420a031a1 -> be9af52e2
Experiment cancel request, Orchestrator side implementation and refactored zookeeper node paths. AIRAVATA-1798
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e0483f37
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e0483f37
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e0483f37
Branch: refs/heads/master
Commit: e0483f37fc301d459212736d9b57adc3169666ba
Parents: 6623967
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Sep 3 19:57:17 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Sep 3 19:57:17 2015 -0400
----------------------------------------------------------------------
.../airavata/common/utils/zkConstants.java | 32 ++++
.../airavata/gfac/core/GFacConstants.java | 8 +-
.../apache/airavata/gfac/core/GFacUtils.java | 15 +-
.../impl/watcher/CancelRequestWatcherImpl.java | 8 +-
.../watcher/RedeliveryRequestWatcherImpl.java | 2 +
.../airavata/gfac/server/GfacServerHandler.java | 67 +++++---
.../server/OrchestratorServerHandler.java | 151 ++++---------------
7 files changed, 122 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
new file mode 100644
index 0000000..9255e02
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.utils;
+
+public interface ZkConstants {
+
+ public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
+ public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
+ public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
+ public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
+ public static final String ZOOKEEPER_TOKEN_NODE = "/token";
+ public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener";
+ public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
index b662fff..444956b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
@@ -50,13 +50,7 @@ public class GFacConstants {
public static final String _127_0_0_1 = "127.0.0.1";
public static final String LOCALHOST = "localhost";
- public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
- public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
- public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
- public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
- public static final String ZOOKEEPER_TOKEN_NODE = "/token";
- public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener";
- public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
+
public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";
http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 784d214..1fca2d0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.DBUtil;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
import org.apache.airavata.gfac.core.context.ProcessContext;
@@ -551,7 +552,7 @@ public class GFacUtils {
* @throws InterruptedException
*/
public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
- String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+ String experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
List<String> children = curatorClient.getChildren().forPath(experimentNode);
for (String pickedChild : children) {
String experimentPath = experimentNode + File.separator + pickedChild;
@@ -568,9 +569,9 @@ public class GFacUtils {
public static boolean setExperimentCancelRequest(String processId, CuratorFramework curatorClient, long
deliveryTag) throws Exception {
- String experimentNode = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
- String cancelListenerNodePath = ZKPaths.makePath(experimentNode, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
- curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, GFacConstants.ZOOKEEPER_CANCEL_REQEUST
+ String experimentNode = ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+ String cancelListenerNodePath = ZKPaths.makePath(experimentNode, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+ curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
.getBytes());
return true;
}
@@ -768,7 +769,7 @@ public class GFacUtils {
// }
public static String getZKGfacServersParentPath() {
- return ZKPaths.makePath(GFacConstants.ZOOKEEPER_SERVERS_NODE, GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE);
+ return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE);
}
public static JobDescriptor createJobDescriptor(ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
@@ -1113,11 +1114,11 @@ public class GFacUtils {
}
public static String getExperimentNodePath(String experimentId) {
- return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
+ return ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
}
public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId) throws Exception {
- String deliveryTagPath = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants
+ String deliveryTagPath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + ZkConstants
.ZOOKEEPER_DELIVERYTAG_NODE;
byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
return GFacUtils.bytesToLong(bytes);
http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index bfeac89..58d2817 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gfac.impl.watcher;
+import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
import org.apache.airavata.gfac.impl.Factory;
@@ -44,7 +45,7 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
byte[] bytes = curatorClient.getData().forPath(path);
String processId = path.substring(path.lastIndexOf("/") + 1);
String action = new String(bytes);
- if (action.equalsIgnoreCase("CANCEL")) {
+ if (action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
if (processContext != null) {
processContext.setCancel(true);
@@ -56,9 +57,12 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
curatorClient.getData().usingWatcher(this).forPath(path);
}
break;
+ case NodeDeleted:
+ //end of experiment execution, ignore this event
+ break;
case NodeCreated:
case NodeChildrenChanged:
- case NodeDeleted:
+ case None:
curatorClient.getData().usingWatcher(this).forPath(path);
break;
default:
http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
index 4738edb..dc5317f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
@@ -58,6 +58,8 @@ public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher {
}
break;
case NodeDeleted:
+ //end of experiment execution, ignore this event
+ break;
case NodeCreated:
case NodeChildrenChanged:
case None:
http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9ebfa05..1040b05 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacException;
@@ -54,6 +55,7 @@ import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,7 +118,7 @@ public class GfacServerHandler implements GfacService.Iface {
airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort();
// create PERSISTENT nodes
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath());
- ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE);
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), ZkConstants.ZOOKEEPER_EXPERIMENT_NODE);
// create EPHEMERAL server name node
String gfacName = ServerSettings.getGFacServerName();
if (curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath() ,gfacName)) == null) {
@@ -196,7 +198,7 @@ public class GfacServerHandler implements GfacService.Iface {
private String gfacServerName;
public ProcessLaunchMessageHandler() throws ApplicationSettingsException {
- experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+ experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
gfacServerName = ServerSettings.getGFacServerName();
}
@@ -226,8 +228,7 @@ public class GfacServerHandler implements GfacService.Iface {
if (Factory.getGfacContext().getProcess(event.getProcessId()) != null) {
// update deliver tag
try {
- updateDeliveryTag(curatorClient, gfacServerName, event.getProcessId(), message
- .getDeliveryTag());
+ updateDeliveryTag(curatorClient, gfacServerName, event, message );
return;
} catch (Exception e) {
log.error("Error while updating delivery tag for redelivery message , messageId : " +
@@ -254,8 +255,7 @@ public class GfacServerHandler implements GfacService.Iface {
.getProcessId());
publishProcessStatus(event, status);
try {
- createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(), message
- .getDeliveryTag(), event.getTokenId());
+ createProcessZKNode(curatorClient, gfacServerName, event, message);
submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
} catch (Exception e) {
log.error(e.getMessage(), e);
@@ -306,38 +306,55 @@ public class GfacServerHandler implements GfacService.Iface {
statusPublisher.publish(msgCtx);
}
- private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName, String
- processId, long deliveryTag, String token) throws Exception {
- // TODO - To handle multiple processes per experiment, need to create a /experiments/{expId}/{processId} node
- // create /experiments/{processId} node and set data - serverName, add redelivery listener
- String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+ private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName,ProcessSubmitEvent event
+ ,MessageContext messageContext) throws Exception {
+ String processId = event.getProcessId();
+ String token = event.getTokenId();
+ String experimentId = event.getExperimentId();
+ long deliveryTag = messageContext.getDeliveryTag();
+
+ // create /experiments//{experimentId}{processId} node and set data - serverName, add redelivery listener
+ String experimentNodePath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + experimentId;
+ String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath);
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath);
- // create /experiments/{processId}/deliveryTag node and set data - deliveryTag
- String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+ // create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag
+ String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath);
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
- // create /experiments/{processId}/token node and set data - token
- String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE);
+ // create /experiments/{experimentId}/{processId}/token node and set data - token
+ String tokenNodePath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_TOKEN_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath);
curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes());
- // create /experiments/{processId}/cancelListener node and set watcher for data changes
- String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+ // create /experiments/{experimentId}/{processId}/cancelListener node and set watcher for data changes
+/* String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode);
- curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);
+ curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);*/
+
+ // create /experiments/{experimentId}/cancel node and set watcher for data changes
+ String experimentCancelNode = experimentNodePath + "/" + ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE;
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
+ curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath (experimentCancelNode);
+
}
- private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, String processId, long
- deliveryTag) throws Exception {
- // create /experiments/{processId} node and set data - serverName, add redelivery listener
- String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
- // create /experiments/{processId}/deliveryTag node and set data - deliveryTag
- String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
- curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
+ private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, ProcessSubmitEvent event,
+ MessageContext messageContext) throws Exception {
+ String experimentId = event.getExperimentId();
+ String processId = event.getProcessId();
+ long deliveryTag = messageContext.getDeliveryTag();
+ String processNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+ experimentId), processId);
+ Stat stat = curatorClient.checkExists().forPath(processNodePath);
+ if (stat != null) {
+ // create /experiments/{processId}/deliveryTag node and set data - deliveryTag
+ String deliveryTagPath = ZKPaths.makePath(processNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+ curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 8427d0c..acb5530 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
@@ -59,8 +60,14 @@ import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractRes
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
import org.apache.airavata.registry.cpi.*;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +87,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private String gatewayName;
private Publisher publisher;
private RabbitMQStatusConsumer statusConsumer;
+ private CuratorFramework curatorClient;
/**
* Query orchestrator server to fetch the CPI version
@@ -109,6 +117,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
String exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
statusConsumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
statusConsumer.listen(new ProcessStatusHandler());
+ startCurator();
} catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) {
log.error(e.getMessage(), e);
throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -209,7 +218,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
*/
public boolean terminateExperiment(String experimentId, String tokenId) throws TException {
log.info(experimentId, "Experiment: {} is cancelling !!!!!", experimentId);
- return validateStatesAndCancel(experimentId, tokenId);
+ try {
+ return validateStatesAndCancel(experimentId, tokenId);
+ } catch (Exception e) {
+ log.error("expId : " + experimentId + " :- Error while cancelling experiment", e);
+ return false;
+ }
}
private String getAiravataUserName() {
@@ -277,7 +291,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
List<ComputeResourceDescription> computeHostList = Arrays.asList(deploymentMap.keySet().toArray(new ComputeResourceDescription[]{}));
Class<? extends HostScheduler> aClass = Class.forName(
ServerSettings.getHostScheduler()).asSubclass(
- HostScheduler.class);
+ HostScheduler.class);
HostScheduler hostScheduler = aClass.newInstance();
ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList);
return deploymentMap.get(ComputeResourceDescription);
@@ -297,124 +311,15 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
return selectedModuleId;
}
- private boolean validateStatesAndCancel(String experimentId, String tokenId)throws TException{
- // FIXME
-// try {
-// Experiment experiment = (Experiment) experimentCatalog.get(
-// ExperimentCatalogModelType.EXPERIMENT, experimentId);
-// log.info("Waiting for zookeeper to connect to the server");
-// synchronized (mutex){
-// mutex.wait(5000);
-// }
-// if (experiment == null) {
-// log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.", experimentId);
-// throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId);
-// }
-// ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState();
-// if (isCancelValid(experimentState)){
-// ExperimentStatus status = new ExperimentStatus();
-// status.setExperimentState(ExperimentState.CANCELING);
-// status.setTimeOfStateChange(Calendar.getInstance()
-// .getTimeInMillis());
-// experiment.setExperimentStatus(status);
-// experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment,
-// experimentId);
-//
-// List<String> ids = experimentCatalog.getIds(
-// ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-// WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-// for (String workflowNodeId : ids) {
-// WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) experimentCatalog
-// .get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-// workflowNodeId);
-// int value = workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue();
-// if ( value> 1 && value < 7) { // we skip the unknown state
-// log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot mark as cancelled, because " +
-// "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString());
-// continue; // this continue is very useful not to process deeper loops if the upper layers have non-cancel states
-// } else {
-// WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
-// workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING);
-// workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-// .getTimeInMillis());
-// workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-// experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
-// workflowNodeId);
-// }
-// List<Object> taskDetailList = experimentCatalog.get(
-// ExperimentCatalogModelType.TASK_DETAIL,
-// TaskDetailConstants.NODE_ID, workflowNodeId);
-// for (Object o : taskDetailList) {
-// TaskDetails taskDetails = (TaskDetails) o;
-// TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
-// if (taskStatus.getExecutionState().getValue() > 7 && taskStatus.getExecutionState().getValue()<12) {
-// log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark as cancelled, because " +
-// "current task state is " + ((TaskDetails) o).getTaskStatus().getExecutionState().toString());
-// continue;// this continue is very useful not to process deeper loops if the upper layers have non-cancel states
-// } else {
-// taskStatus.setExecutionState(TaskState.CANCELING);
-// taskStatus.setTimeOfStateChange(Calendar.getInstance()
-// .getTimeInMillis());
-// taskDetails.setTaskStatus(taskStatus);
-// experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, o,
-// taskDetails.getTaskID());
-// }
-// orchestrator.cancelExperiment(experiment,
-// workflowNodeDetail, taskDetails, tokenId);
-// // Status update should be done at the monitor
-// }
-// }
-// }else {
-// if (isCancelAllowed(experimentState)){
-// // when experiment status is < 3 no jobDetails object is created,
-// // so we don't have to worry, we simply have to change the status and stop the execution
-// ExperimentStatus status = new ExperimentStatus();
-// status.setExperimentState(ExperimentState.CANCELED);
-// status.setTimeOfStateChange(Calendar.getInstance()
-// .getTimeInMillis());
-// experiment.setExperimentStatus(status);
-// experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment,
-// experimentId);
-// List<String> ids = experimentCatalog.getIds(
-// ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-// WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-// for (String workflowNodeId : ids) {
-// WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) experimentCatalog
-// .get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-// workflowNodeId);
-// WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
-// workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
-// workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-// .getTimeInMillis());
-// workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-// experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
-// workflowNodeId);
-// List<Object> taskDetailList = experimentCatalog.get(
-// ExperimentCatalogModelType.TASK_DETAIL,
-// TaskDetailConstants.NODE_ID, workflowNodeId);
-// for (Object o : taskDetailList) {
-// TaskDetails taskDetails = (TaskDetails) o;
-// TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
-// taskStatus.setExecutionState(TaskState.CANCELED);
-// taskStatus.setTimeOfStateChange(Calendar.getInstance()
-// .getTimeInMillis());
-// taskDetails.setTaskStatus(taskStatus);
-// experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, o,
-// taskDetails);
-// }
-// }
-// }else {
-// log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.",
-// experiment.getExperimentStatus().getExperimentState().toString(), experimentId);
-// throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: "
-// + experiment.getExperimentStatus().getExperimentState().toString());
-// }
-// }
-// log.info("Experiment: " + experimentId + " is cancelled !!!!!");
-// } catch (Exception e) {
-// throw new TException(e);
-// }
- return true;
+ private boolean validateStatesAndCancel(String experimentId, String tokenId) throws Exception {
+ String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+ experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+ Stat stat = curatorClient.checkExists().forPath(expCancelNodePath);
+ if (stat != null) {
+ curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
+ .getBytes());
+ }
+ return true;
}
private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
@@ -427,6 +332,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
// }
}
+ private void startCurator() throws ApplicationSettingsException {
+ String connectionSting = ServerSettings.getZookeeperConnection();
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+ curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+ curatorClient.start();
+ }
private class SingleAppExperimentRunner implements Runnable {
String experimentId;
[2/2] airavata git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/be9af52e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/be9af52e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/be9af52e
Branch: refs/heads/master
Commit: be9af52e246ba5f91fba83d4b95bc21600a490d0
Parents: e0483f3 420a031
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Sep 3 19:57:26 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Sep 3 19:57:26 2015 -0400
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 3 +-
.../java/org/apache/airavata/api/Airavata.java | 141 ++-----------------
.../main/resources/lib/airavata/Airavata.cpp | 35 +----
.../src/main/resources/lib/airavata/Airavata.h | 32 ++---
.../lib/airavata/Airavata_server.skeleton.cpp | 4 +-
.../resources/lib/Airavata/API/Airavata.php | 42 +-----
.../lib/apache/airavata/api/Airavata-remote | 8 +-
.../lib/apache/airavata/api/Airavata.py | 39 +----
.../client/samples/CreateLaunchBES.java | 2 +-
.../client/samples/CreateLaunchExperiment.java | 2 +-
.../samples/CreateLaunchExperimentUS3.java | 4 +-
.../tools/RegisterOGCEUS3Application.java | 20 +--
.../tools/RegisterSampleApplications.java | 14 +-
.../client/tools/RegisterUS3Application.java | 20 +--
.../org/apache/airavata/model/Workflow.java | 2 +-
.../ApplicationDeploymentDescription.java | 2 +-
.../appdeployment/ApplicationModule.java | 2 +-
.../appcatalog/appdeployment/SetEnvPaths.java | 2 +-
.../ApplicationInterfaceDescription.java | 2 +-
.../appcatalog/computeresource/BatchQueue.java | 2 +-
.../computeresource/CloudJobSubmission.java | 2 +-
.../ComputeResourceDescription.java | 2 +-
.../computeresource/DataMovementInterface.java | 2 +-
.../computeresource/GlobusJobSubmission.java | 2 +-
.../computeresource/GridFTPDataMovement.java | 2 +-
.../computeresource/JobSubmissionInterface.java | 2 +-
.../computeresource/LOCALDataMovement.java | 2 +-
.../computeresource/LOCALSubmission.java | 2 +-
.../computeresource/ResourceJobManager.java | 2 +-
.../computeresource/SCPDataMovement.java | 2 +-
.../computeresource/SSHJobSubmission.java | 2 +-
.../computeresource/UnicoreDataMovement.java | 2 +-
.../computeresource/UnicoreJobSubmission.java | 2 +-
.../ComputeResourcePreference.java | 2 +-
.../gatewayprofile/GatewayResourceProfile.java | 2 +-
.../application/io/InputDataObjectType.java | 2 +-
.../application/io/OutputDataObjectType.java | 2 +-
.../airavata/model/commons/ErrorModel.java | 2 +-
.../model/commons/ValidationResults.java | 2 +-
.../airavata/model/commons/ValidatorResult.java | 2 +-
.../model/error/AiravataClientException.java | 2 +-
.../model/error/AiravataSystemException.java | 2 +-
.../model/error/AuthenticationException.java | 2 +-
.../model/error/AuthorizationException.java | 2 +-
.../error/ExperimentNotFoundException.java | 2 +-
.../model/error/InvalidRequestException.java | 2 +-
.../model/error/LaunchValidationException.java | 2 +-
.../model/error/ProjectNotFoundException.java | 2 +-
.../airavata/model/error/TimedOutException.java | 2 +-
.../airavata/model/error/ValidationResults.java | 2 +-
.../airavata/model/error/ValidatorResult.java | 2 +-
.../model/experiment/ExperimentModel.java | 2 +-
.../model/experiment/ExperimentStatistics.java | 2 +-
.../experiment/ExperimentSummaryModel.java | 2 +-
.../experiment/UserConfigurationDataModel.java | 2 +-
.../org/apache/airavata/model/job/JobModel.java | 2 +-
.../event/ExperimentStatusChangeEvent.java | 2 +-
.../model/messaging/event/JobIdentifier.java | 2 +-
.../messaging/event/JobStatusChangeEvent.java | 2 +-
.../event/JobStatusChangeRequestEvent.java | 2 +-
.../airavata/model/messaging/event/Message.java | 2 +-
.../messaging/event/ProcessIdentifier.java | 2 +-
.../event/ProcessStatusChangeEvent.java | 2 +-
.../event/ProcessStatusChangeRequestEvent.java | 2 +-
.../messaging/event/ProcessSubmitEvent.java | 2 +-
.../messaging/event/ProcessTerminateEvent.java | 2 +-
.../model/messaging/event/TaskIdentifier.java | 2 +-
.../messaging/event/TaskOutputChangeEvent.java | 2 +-
.../messaging/event/TaskStatusChangeEvent.java | 2 +-
.../event/TaskStatusChangeRequestEvent.java | 2 +-
.../airavata/model/process/ProcessModel.java | 2 +-
.../ComputationalResourceSchedulingModel.java | 2 +-
.../airavata/model/security/AuthzToken.java | 2 +-
.../airavata/model/status/ExperimentStatus.java | 2 +-
.../apache/airavata/model/status/JobStatus.java | 2 +-
.../airavata/model/status/ProcessStatus.java | 2 +-
.../airavata/model/status/TaskStatus.java | 2 +-
.../model/task/DataStagingTaskModel.java | 2 +-
.../apache/airavata/model/task/TaskModel.java | 2 +-
.../airavata/model/workspace/Gateway.java | 2 +-
.../apache/airavata/model/workspace/Group.java | 2 +-
.../airavata/model/workspace/Project.java | 2 +-
.../apache/airavata/model/workspace/User.java | 2 +-
.../apache/airavata/gfac/core/GFacUtils.java | 84 +++++++++++
.../airavata/gfac/core/context/TaskContext.java | 20 +++
.../airavata/gfac/impl/GFacEngineImpl.java | 2 +
.../impl/task/AdvancedSCPDataStageTask.java | 23 ++-
.../catalog/impl/ExperimentRegistry.java | 2 +-
.../airavata-api/airavata_api.thrift | 2 +-
89 files changed, 266 insertions(+), 371 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/be9af52e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------