You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/13 22:24:02 UTC
airavata git commit: Don't need to check Gfac node in zookeeper
before publish Terminate request to work queue.
Repository: airavata
Updated Branches:
refs/heads/master 420a51ad6 -> 2bf83dd03
Don't need to check Gfac node in zookeeper before publish Terminate request to work queue.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2bf83dd0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2bf83dd0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2bf83dd0
Branch: refs/heads/master
Commit: 2bf83dd03cb1f312a8c1de114aedcd0801337901
Parents: 420a51a
Author: shamrath <sh...@gmail.com>
Authored: Wed May 13 16:24:00 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Wed May 13 16:24:00 2015 -0400
----------------------------------------------------------------------
.../core/impl/GFACPassiveJobSubmitter.java | 55 +++-----------------
1 file changed, 8 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/2bf83dd0/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index cd5b45b..1faef9e 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -135,20 +135,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
* @throws OrchestratorException
*/
public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException {
- ZooKeeper zk = orchestratorContext.getZk();
+ String gatewayId = null;
try {
- if (zk == null || !zk.getState().isConnected()) {
- String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
- logger.info("Waiting for zookeeper to connect to the server");
- synchronized (mutex) {
- mutex.wait(5000);
- }
- }
- String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
- String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- List<String> children = zk.getChildren(gfacServer, this);
- String gatewayId = null;
CredentialReader credentialReader = GFacUtils.getCredentialReader();
if (credentialReader != null) {
try {
@@ -158,45 +146,18 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
}
}
if (gatewayId == null || gatewayId.isEmpty()) {
+
gatewayId = ServerSettings.getDefaultUserGateway();
+
}
- if (children.size() == 0) {
- // Zookeeper data need cleaning
- throw new OrchestratorException("There is no active GFac instance to route the request");
- } else {
- String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
- // here we are not using an index because the getChildren does not return the same order everytime
- String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
- logger.info("GFAC instance node data: " + gfacNodeData);
- String[] split = gfacNodeData.split(":");
- if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
- // before submitting the job we check again the state of the node
- TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(experimentID, taskID, gatewayId, tokenId);
- MessageContext messageContext = new MessageContext(taskTerminateEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId);
- messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- publisher.publish(messageContext);
- }
- }
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- throw new OrchestratorException(e);
- } catch (KeeperException e) {
- logger.error(e.getMessage(), e);
- throw new OrchestratorException(e);
- } catch (ApplicationSettingsException e) {
- logger.error(e.getMessage(), e);
- throw new OrchestratorException(e);
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- throw new OrchestratorException(e);
+ TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(experimentID, taskID, gatewayId, tokenId);
+ MessageContext messageContext = new MessageContext(taskTerminateEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
+ return true;
} catch (Exception e) {
- logger.error(e.getMessage(), e);
throw new OrchestratorException(e);
- }finally {
- closeZK(orchestratorContext);
}
- return false;
-
}
private void closeZK(OrchestratorContext orchestratorContext) {