You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/10/18 15:46:05 UTC
[airavata] branch staging updated: Adding prefix to monitoring paths
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push:
new 656b3bf Adding prefix to monitoring paths
656b3bf is described below
commit 656b3bf2744f83e40457cf25ec5e85ada44b8a89
Author: Dimuthu Wannipurage <di...@datasprouts.com>
AuthorDate: Thu Oct 18 11:45:56 2018 -0400
Adding prefix to monitoring paths
---
.../airavata/helix/core/util/MonitoringUtil.java | 69 +++++++++++-----------
1 file changed, 35 insertions(+), 34 deletions(-)
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
index f9a0418..2434196 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
@@ -14,6 +14,7 @@ public class MonitoringUtil {
private final static Logger logger = LoggerFactory.getLogger(MonitoringUtil.class);
+ private static final String PATH_PREFIX = "/airavata";
private static final String MONITORING = "/monitoring/";
private static final String REGISTRY = "/registry/";
@@ -40,30 +41,30 @@ public class MonitoringUtil {
+ processId + ", gateway : " + gateway);
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- MONITORING + jobId + LOCK, new byte[0]);
+ PATH_PREFIX + MONITORING + jobId + LOCK, new byte[0]);
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- MONITORING + jobId + GATEWAY, gateway.getBytes());
+ PATH_PREFIX + MONITORING + jobId + GATEWAY, gateway.getBytes());
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- MONITORING + jobId + PROCESS, processId.getBytes());
+ PATH_PREFIX + MONITORING + jobId + PROCESS, processId.getBytes());
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- MONITORING + jobId + TASK, taskId.getBytes());
+ PATH_PREFIX + MONITORING + jobId + TASK, taskId.getBytes());
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- MONITORING + jobId + EXPERIMENT, experimentId.getBytes());
+ PATH_PREFIX + MONITORING + jobId + EXPERIMENT, experimentId.getBytes());
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- MONITORING + jobId + JOB_NAME, jobName.getBytes());
+ PATH_PREFIX + MONITORING + jobId + JOB_NAME, jobName.getBytes());
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- MONITORING + jobName + JOB_ID, jobId.getBytes());
+ PATH_PREFIX + MONITORING + jobName + JOB_ID, jobId.getBytes());
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- REGISTRY + processId + JOBS, jobId.getBytes());
+ PATH_PREFIX + REGISTRY + processId + JOBS, jobId.getBytes());
}
public static void registerWorkflow(CuratorFramework curatorClient, String processId, String workflowId) throws Exception {
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- REGISTRY + processId + WORKFLOWS + "/" + workflowId , new byte[0]);
+ PATH_PREFIX + REGISTRY + processId + WORKFLOWS + "/" + workflowId , new byte[0]);
}
public static void registerCancelProcess(CuratorFramework curatorClient, String processId) throws Exception {
- String path = REGISTRY + processId + STATUS;
+ String path = PATH_PREFIX + REGISTRY + processId + STATUS;
if (curatorClient.checkExists().forPath(path) != null) {
curatorClient.delete().forPath(path);
}
@@ -72,7 +73,7 @@ public class MonitoringUtil {
}
public static int getTaskRetryCount(CuratorFramework curatorClient, String taskId) throws Exception {
- String path = TASK + "/" + taskId + RETRY;
+ String path = PATH_PREFIX + TASK + "/" + taskId + RETRY;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] processBytes = curatorClient.getData().forPath(path);
return Integer.parseInt(new String(processBytes));
@@ -82,7 +83,7 @@ public class MonitoringUtil {
}
public static void increaseTaskRetryCount(CuratorFramework curatorClient, String takId, int currentRetryCount) throws Exception {
- String path = TASK + "/" + takId + RETRY;
+ String path = PATH_PREFIX + TASK + "/" + takId + RETRY;
if (curatorClient.checkExists().forPath(path) != null) {
curatorClient.delete().forPath(path);
}
@@ -91,7 +92,7 @@ public class MonitoringUtil {
}
public static String getExperimentIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
- String path = MONITORING + jobId + EXPERIMENT;
+ String path = PATH_PREFIX + MONITORING + jobId + EXPERIMENT;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] processBytes = curatorClient.getData().forPath(path);
return new String(processBytes);
@@ -101,7 +102,7 @@ public class MonitoringUtil {
}
public static String getTaskIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
- String path = MONITORING + jobId + TASK;
+ String path = PATH_PREFIX + MONITORING + jobId + TASK;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] processBytes = curatorClient.getData().forPath(path);
return new String(processBytes);
@@ -111,7 +112,7 @@ public class MonitoringUtil {
}
public static String getProcessIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
- String path = MONITORING + jobId + PROCESS;
+ String path = PATH_PREFIX + MONITORING + jobId + PROCESS;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] processBytes = curatorClient.getData().forPath(path);
return new String(processBytes);
@@ -121,7 +122,7 @@ public class MonitoringUtil {
}
public static String getGatewayByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
- String path = MONITORING + jobId + GATEWAY;
+ String path = PATH_PREFIX + MONITORING + jobId + GATEWAY;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] gatewayBytes = curatorClient.getData().forPath(path);
return new String(gatewayBytes);
@@ -131,7 +132,7 @@ public class MonitoringUtil {
}
public static void updateStatusOfJob(CuratorFramework curatorClient, String jobId, JobState jobState) throws Exception {
- String path = MONITORING + jobId + STATUS;
+ String path = PATH_PREFIX + MONITORING + jobId + STATUS;
if (curatorClient.checkExists().forPath(path) != null) {
curatorClient.delete().forPath(path);
}
@@ -139,7 +140,7 @@ public class MonitoringUtil {
}
public static JobState getCurrentStatusOfJob(CuratorFramework curatorClient, String jobId) throws Exception {
- String path = MONITORING + jobId + STATUS;
+ String path = PATH_PREFIX + MONITORING + jobId + STATUS;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] gatewayBytes = curatorClient.getData().forPath(path);
return JobState.valueOf(new String(gatewayBytes));
@@ -149,7 +150,7 @@ public class MonitoringUtil {
}
public static String getJobIdByProcessId(CuratorFramework curatorClient, String processId) throws Exception {
- String path = REGISTRY + processId + JOBS;
+ String path = PATH_PREFIX + REGISTRY + processId + JOBS;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] gatewayBytes = curatorClient.getData().forPath(path);
return new String(gatewayBytes);
@@ -159,7 +160,7 @@ public class MonitoringUtil {
}
public static String getJobNameByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
- String path = MONITORING + jobId + JOB_NAME;
+ String path = PATH_PREFIX + MONITORING + jobId + JOB_NAME;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] gatewayBytes = curatorClient.getData().forPath(path);
return new String(gatewayBytes);
@@ -169,7 +170,7 @@ public class MonitoringUtil {
}
public static String getJobIdByJobName(CuratorFramework curatorClient, String jobName) throws Exception {
- String path = MONITORING + jobName + JOB_ID;
+ String path = PATH_PREFIX + MONITORING + jobName + JOB_ID;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] gatewayBytes = curatorClient.getData().forPath(path);
return new String(gatewayBytes);
@@ -179,12 +180,12 @@ public class MonitoringUtil {
}
public static boolean hasMonitoringRegistered(CuratorFramework curatorClient, String jobId) throws Exception {
- Stat stat = curatorClient.checkExists().forPath(MONITORING + jobId);
+ Stat stat = curatorClient.checkExists().forPath(PATH_PREFIX + MONITORING + jobId);
return stat != null;
}
public static String getStatusOfProcess(CuratorFramework curatorClient, String processId) throws Exception {
- String path = REGISTRY + processId + STATUS;
+ String path = PATH_PREFIX + REGISTRY + processId + STATUS;
if (curatorClient.checkExists().forPath(path) != null) {
byte[] statusBytes = curatorClient.getData().forPath(path);
return new String(statusBytes);
@@ -194,7 +195,7 @@ public class MonitoringUtil {
}
public static List<String> getWorkflowsOfProcess(CuratorFramework curatorClient, String processId) throws Exception {
- String path = REGISTRY + processId + WORKFLOWS;
+ String path = PATH_PREFIX + REGISTRY + processId + WORKFLOWS;
if (curatorClient.checkExists().forPath(path) != null) {
return curatorClient.getChildren().forPath(path);
} else {
@@ -209,7 +210,7 @@ public class MonitoringUtil {
}
public static void deleteTaskSpecificNodes(CuratorFramework curatorClient, String takId) throws Exception {
- deleteIfExists(curatorClient, TASK + "/" + takId + RETRY);
+ deleteIfExists(curatorClient, PATH_PREFIX + TASK + "/" + takId + RETRY);
}
public static void deleteProcessSpecificNodes(CuratorFramework curatorClient, String processId) throws Exception {
@@ -218,18 +219,18 @@ public class MonitoringUtil {
if (jobId != null) {
logger.info("Deleting zookeeper paths in job monitoring for job id : " + jobId);
- deleteIfExists(curatorClient, MONITORING + jobId + LOCK);
- deleteIfExists(curatorClient, MONITORING + jobId + GATEWAY);
- deleteIfExists(curatorClient, MONITORING + jobId + PROCESS);
- deleteIfExists(curatorClient, MONITORING + jobId + TASK);
- deleteIfExists(curatorClient, MONITORING + jobId + EXPERIMENT);
- deleteIfExists(curatorClient, MONITORING + jobId + JOB_NAME);
+ deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + LOCK);
+ deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + GATEWAY);
+ deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + PROCESS);
+ deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + TASK);
+ deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + EXPERIMENT);
+ deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobId + JOB_NAME);
String jobName = getJobNameByJobId(curatorClient, jobId);
- deleteIfExists(curatorClient, MONITORING + jobName + JOB_ID);
+ deleteIfExists(curatorClient, PATH_PREFIX + MONITORING + jobName + JOB_ID);
}
- deleteIfExists(curatorClient, REGISTRY + processId + JOBS);
- deleteIfExists(curatorClient, REGISTRY + processId + WORKFLOWS);
+ deleteIfExists(curatorClient, PATH_PREFIX + REGISTRY + processId + JOBS);
+ deleteIfExists(curatorClient, PATH_PREFIX + REGISTRY + processId + WORKFLOWS);
}
}