You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/10/18 15:46:05 UTC

[airavata] branch staging updated: Adding prefix to monitoring paths

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging by this push:
     new 656b3bf  Adding prefix to monitoring paths
656b3bf is described below

commit 656b3bf2744f83e40457cf25ec5e85ada44b8a89
Author: Dimuthu Wannipurage <di...@datasprouts.com>
AuthorDate: Thu Oct 18 11:45:56 2018 -0400

    Adding prefix to monitoring paths
---
 .../airavata/helix/core/util/MonitoringUtil.java   | 69 +++++++++++-----------
 1 file changed, 35 insertions(+), 34 deletions(-)

diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
index f9a0418..2434196 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
@@ -14,6 +14,7 @@ public class MonitoringUtil {
 
     private final static Logger logger = LoggerFactory.getLogger(MonitoringUtil.class);
 
+    private static final String PATH_PREFIX = "/airavata";
     private static final String MONITORING = "/monitoring/";
     private static final String REGISTRY = "/registry/";
 
@@ -40,30 +41,30 @@ public class MonitoringUtil {
                 + processId + ", gateway : " + gateway);
 
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                MONITORING + jobId + LOCK, new byte[0]);
+                PATH_PREFIX + MONITORING + jobId + LOCK, new byte[0]);
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                MONITORING + jobId + GATEWAY, gateway.getBytes());
+                PATH_PREFIX + MONITORING + jobId + GATEWAY, gateway.getBytes());
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                MONITORING + jobId + PROCESS, processId.getBytes());
+                PATH_PREFIX + MONITORING + jobId + PROCESS, processId.getBytes());
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                MONITORING + jobId + TASK, taskId.getBytes());
+                PATH_PREFIX + MONITORING + jobId + TASK, taskId.getBytes());
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                MONITORING + jobId + EXPERIMENT, experimentId.getBytes());
+                PATH_PREFIX + MONITORING + jobId + EXPERIMENT, experimentId.getBytes());
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                MONITORING + jobId + JOB_NAME, jobName.getBytes());
+                PATH_PREFIX + MONITORING + jobId + JOB_NAME, jobName.getBytes());
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                MONITORING + jobName + JOB_ID, jobId.getBytes());
+                PATH_PREFIX + MONITORING + jobName + JOB_ID, jobId.getBytes());
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                REGISTRY + processId + JOBS, jobId.getBytes());
+                PATH_PREFIX + REGISTRY + processId + JOBS, jobId.getBytes());
     }
 
     public static void registerWorkflow(CuratorFramework curatorClient, String processId, String workflowId) throws Exception {
         curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                REGISTRY + processId + WORKFLOWS + "/" + workflowId , new byte[0]);
+                PATH_PREFIX + REGISTRY + processId + WORKFLOWS + "/" + workflowId , new byte[0]);
     }
 
     public static void registerCancelProcess(CuratorFramework curatorClient, String processId) throws Exception {
-        String path = REGISTRY + processId + STATUS;
+        String path = PATH_PREFIX + REGISTRY + processId + STATUS;
         if (curatorClient.checkExists().forPath(path) != null) {
             curatorClient.delete().forPath(path);
         }
@@ -72,7 +73,7 @@ public class MonitoringUtil {
     }
 
     public static int getTaskRetryCount(CuratorFramework curatorClient, String taskId) throws Exception {
-        String path = TASK + "/" + taskId + RETRY;
+        String path = PATH_PREFIX + TASK + "/" + taskId + RETRY;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] processBytes = curatorClient.getData().forPath(path);
             return Integer.parseInt(new String(processBytes));
@@ -82,7 +83,7 @@ public class MonitoringUtil {
     }
 
     public static void increaseTaskRetryCount(CuratorFramework curatorClient, String takId, int currentRetryCount) throws Exception {
-        String path = TASK + "/" + takId + RETRY;
+        String path = PATH_PREFIX + TASK + "/" + takId + RETRY;
         if (curatorClient.checkExists().forPath(path) != null) {
             curatorClient.delete().forPath(path);
         }
@@ -91,7 +92,7 @@ public class MonitoringUtil {
     }
 
     public static String getExperimentIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
-        String path = MONITORING + jobId + EXPERIMENT;
+        String path = PATH_PREFIX + MONITORING + jobId + EXPERIMENT;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] processBytes = curatorClient.getData().forPath(path);
             return new String(processBytes);
@@ -101,7 +102,7 @@ public class MonitoringUtil {
     }
 
     public static String getTaskIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
-        String path = MONITORING + jobId + TASK;
+        String path = PATH_PREFIX + MONITORING + jobId + TASK;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] processBytes = curatorClient.getData().forPath(path);
             return new String(processBytes);
@@ -111,7 +112,7 @@ public class MonitoringUtil {
     }
 
     public static String getProcessIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
-        String path = MONITORING + jobId + PROCESS;
+        String path = PATH_PREFIX + MONITORING + jobId + PROCESS;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] processBytes = curatorClient.getData().forPath(path);
             return new String(processBytes);
@@ -121,7 +122,7 @@ public class MonitoringUtil {
     }
 
     public static String getGatewayByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
-        String path = MONITORING + jobId + GATEWAY;
+        String path = PATH_PREFIX + MONITORING + jobId + GATEWAY;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] gatewayBytes = curatorClient.getData().forPath(path);
             return new String(gatewayBytes);
@@ -131,7 +132,7 @@ public class MonitoringUtil {
     }
 
     public static void updateStatusOfJob(CuratorFramework curatorClient, String jobId, JobState jobState) throws Exception {
-        String path = MONITORING + jobId + STATUS;
+        String path = PATH_PREFIX + MONITORING + jobId + STATUS;
         if (curatorClient.checkExists().forPath(path) != null) {
             curatorClient.delete().forPath(path);
         }
@@ -139,7 +140,7 @@ public class MonitoringUtil {
     }
 
     public static JobState getCurrentStatusOfJob(CuratorFramework curatorClient, String jobId) throws Exception {
-        String path = MONITORING + jobId + STATUS;
+        String path = PATH_PREFIX + MONITORING + jobId + STATUS;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] gatewayBytes = curatorClient.getData().forPath(path);
             return JobState.valueOf(new String(gatewayBytes));
@@ -149,7 +150,7 @@ public class MonitoringUtil {
     }
 
     public static String getJobIdByProcessId(CuratorFramework curatorClient, String processId) throws Exception {
-        String path = REGISTRY + processId + JOBS;
+        String path = PATH_PREFIX + REGISTRY + processId + JOBS;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] gatewayBytes = curatorClient.getData().forPath(path);
             return new String(gatewayBytes);
@@ -159,7 +160,7 @@ public class MonitoringUtil {
     }
 
     public static String getJobNameByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
-        String path = MONITORING + jobId + JOB_NAME;
+        String path = PATH_PREFIX + MONITORING + jobId + JOB_NAME;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] gatewayBytes = curatorClient.getData().forPath(path);
             return new String(gatewayBytes);
@@ -169,7 +170,7 @@ public class MonitoringUtil {
     }
 
     public static String getJobIdByJobName(CuratorFramework curatorClient, String jobName) throws Exception {
-        String path = MONITORING + jobName + JOB_ID;
+        String path = PATH_PREFIX + MONITORING + jobName + JOB_ID;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] gatewayBytes = curatorClient.getData().forPath(path);
             return new String(gatewayBytes);
@@ -179,12 +180,12 @@ public class MonitoringUtil {
     }
 
     public static boolean hasMonitoringRegistered(CuratorFramework curatorClient, String jobId) throws Exception {
-        Stat stat = curatorClient.checkExists().forPath(MONITORING + jobId);
+        Stat stat = curatorClient.checkExists().forPath(PATH_PREFIX + MONITORING + jobId);
         return stat != null;
     }
 
     public static String getStatusOfProcess(CuratorFramework curatorClient, String processId) throws Exception {
-        String path = REGISTRY + processId + STATUS;
+        String path = PATH_PREFIX + REGISTRY + processId + STATUS;
         if (curatorClient.checkExists().forPath(path) != null) {
             byte[] statusBytes = curatorClient.getData().forPath(path);
             return new String(statusBytes);
@@ -194,7 +195,7 @@ public class MonitoringUtil {
     }
 
     public static List<String> getWorkflowsOfProcess(CuratorFramework curatorClient, String processId) throws Exception {
-        String path = REGISTRY + processId + WORKFLOWS;
+        String path = PATH_PREFIX + REGISTRY + processId + WORKFLOWS;
         if (curatorClient.checkExists().forPath(path) != null) {
             return curatorClient.getChildren().forPath(path);
         } else {
@@ -209,7 +210,7 @@ public class MonitoringUtil {
     }
 
     public static void deleteTaskSpecificNodes(CuratorFramework curatorClient, String takId) throws Exception {
-        deleteIfExists(curatorClient, TASK + "/" + takId + RETRY);
+        deleteIfExists(curatorClient, PATH_PREFIX + TASK + "/" + takId + RETRY);
     }
 
     public static void deleteProcessSpecificNodes(CuratorFramework curatorClient, String processId) throws Exception {
@@ -218,18 +219,18 @@ public class MonitoringUtil {
 
         if (jobId != null) {
             logger.info("Deleting zookeeper paths in job monitoring for job id : " + jobId);
-            deleteIfExists(curatorClient, MONITORING + jobId + LOCK);
-            deleteIfExists(curatorClient, MONITORING + jobId + GATEWAY);
-            deleteIfExists(curatorClient, MONITORING + jobId + PROCESS);
-            deleteIfExists(curatorClient, MONITORING + jobId + TASK);
-            deleteIfExists(curatorClient, MONITORING + jobId + EXPERIMENT);
-            deleteIfExists(curatorClient, MONITORING + jobId + JOB_NAME);
+            deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + LOCK);
+            deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + GATEWAY);
+            deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + PROCESS);
+            deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + TASK);
+            deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + EXPERIMENT);
+            deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + JOB_NAME);
 
             String jobName = getJobNameByJobId(curatorClient, jobId);
-            deleteIfExists(curatorClient, MONITORING + jobName + JOB_ID);
+            deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobName + JOB_ID);
         }
 
-        deleteIfExists(curatorClient, REGISTRY + processId + JOBS);
-        deleteIfExists(curatorClient, REGISTRY + processId + WORKFLOWS);
+        deleteIfExists(curatorClient, PATH_PREFIX + REGISTRY + processId + JOBS);
+        deleteIfExists(curatorClient, PATH_PREFIX + REGISTRY + processId + WORKFLOWS);
     }
 }