You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2014/09/13 02:52:47 UTC
[1/2] git commit: Optimized zookeeper job count update process to
update job addition and job deletion steps
Repository: airavata
Updated Branches:
refs/heads/master 2207eceab -> d1d8759fd
Optimized zookeeper job count update process to update job addition and job deletion steps
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9c23aa81
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9c23aa81
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9c23aa81
Branch: refs/heads/master
Commit: 9c23aa81147268b70472df3001bd04575a457e9e
Parents: e45607a
Author: shamrath <sh...@gmail.com>
Authored: Fri Sep 12 17:24:59 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Fri Sep 12 17:24:59 2014 -0400
----------------------------------------------------------------------
.../handlers/GridPullMonitorHandler.java | 1 +
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 160 +++----------------
.../airavata/gfac/monitor/util/CommonUtils.java | 122 ++++++++++++++
3 files changed, 149 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 9ca5235..ff467bf 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -100,6 +100,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
e.printStackTrace();
}
CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
+ CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper
} catch (AiravataMonitorException e) {
logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID());
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index b4c5819..dac9499 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -20,22 +20,7 @@
*/
package org.apache.airavata.gfac.monitor.impl.pull.qstat;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
+import com.google.common.eventbus.EventBus;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -57,16 +42,20 @@ import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.apache.airavata.schemas.gfac.SSHHostType;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.EventBus;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* This monitor is based on qstat command which can be run
@@ -275,10 +264,24 @@ public class HPCPullMonitor extends PullMonitor {
queue.put(take);
// cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if
// they become empty
+ Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>();
+ ZooKeeper zk = null;
for (MonitorID completedJob : completedJobs) {
CommonUtils.removeMonitorFromQueue(queue, completedJob);
+ if (zk == null) {
+ zk = completedJob.getJobExecutionContext().getZk();
+ }
+ String key = CommonUtils.getJobCountUpdatePath(completedJob);
+ int i = 0;
+ if (jobRemoveCountMap.containsKey(key)) {
+ i = Integer.valueOf(jobRemoveCountMap.get(key));
+ }
+ jobRemoveCountMap.put(key, ++i);
+ }
+ if (completedJobs.size() > 0) {
+ // reduce completed job count from zookeeper
+ CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap, false);
}
-// updateZkWithJobCount(take , completedJobs);
} catch (InterruptedException e) {
if (!this.queue.contains(take)) {
try {
@@ -342,117 +345,6 @@ public class HPCPullMonitor extends PullMonitor {
}
/**
- * Build the /stat/{username}/{hostAddress}/job znode path and store job count
- *
- * @param userMonitorData
- * @param completedJobs
- * @throws ApplicationSettingsException
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- private void updateZkWithJobCount(UserMonitorData userMonitorData, List<MonitorID> completedJobs) {
- try {
- final CountDownLatch latch = new CountDownLatch(1);
- ZooKeeper zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (event.getState() == Event.KeeperState.SyncConnected) {
- latch.countDown();
- }
- }
- });
- latch.await();
-
- try {
- List<String> updatedPathList = new ArrayList<String>();
- String pathToUserName = new StringBuilder("/").append(Constants.STAT)
- .append("/").append(userMonitorData.getUserName()).toString();
- StringBuilder jobPathBuilder;
- for (HostMonitorData hostData : userMonitorData.getHostMonitorData()) {
- jobPathBuilder = new StringBuilder(pathToUserName).append("/")
- .append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
- checkAndCreateZNode(zk, jobPathBuilder.toString());
- int jobCount = 0;
- String jobCountStr = new String(zk.getData(jobPathBuilder.toString(), null, null));
- try {
- jobCount = Integer.parseInt(jobCountStr);
- } catch (NumberFormatException e) {
- // do nothing , keep jobCount 0
- }
- List<MonitorID> idList = hostData.getMonitorIDs();
- boolean updatePath = true;
- if (idList != null) {
- if (jobCount == idList.size()) {
- updatePath = false;
- } else {
- jobCount = idList.size();
- }
- // removed already updated jobs from complete jobs
- for (MonitorID monitorID : idList) {
- if (completedJobs.contains(monitorID)) {
- completedJobs.remove(monitorID);
- }
- }
- }
- if (updatePath) {
- zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1);
- updatedPathList.add(jobPathBuilder.toString());
- }
- }
-
- //handle completed jobs
- /* If all jobs are completed in a host then monitor queue remove such hosts from monitoring ,but we need
- to update those host's stat with JobCount 0 */
- for (MonitorID monitorID : completedJobs) {
- jobPathBuilder = new StringBuilder(pathToUserName).append("/")
- .append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
- zk.setData(jobPathBuilder.toString(), "0".getBytes(), -1);
- updatedPathList.add(jobPathBuilder.toString());
- }
- // trigger orchestrator watcher by saving the updated list to zookeeper
- if (updatedPathList.size() > 0) {
- StringBuilder strBuilder = new StringBuilder();
- for (String updatedPath : updatedPathList) {
- strBuilder.append(updatedPath).append(":");
- }
- strBuilder.deleteCharAt(strBuilder.length() - 1);
- zk.setData(("/" + Constants.STAT), strBuilder.toString().getBytes(), -1);
- }
- zk.close();
- } catch (KeeperException e) {
- logger.error("Error while storing job count to zookeeper", e);
- } catch (InterruptedException e) {
- logger.error("Error while storing job count to zookeeper", e);
- }
- } catch (IOException e) {
- logger.error("Error while connecting to the zookeeper server", e);
- } catch (ApplicationSettingsException e) {
- logger.error("Error while getting zookeeper hostport property", e);
- } catch (InterruptedException e) {
- logger.error("Error while waiting for SyncConnected message", e);
- }
-
- }
-
- /**
- * Check whether znode is exist in given path if not create a new znode
- * @param zk - zookeeper instance
- * @param path - path to check znode
- * @throws KeeperException
- * @throws InterruptedException
- */
- private void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException {
- if (zk.exists(path, null) == null) { // if znode doesn't exist
- if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist
- checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
- }
- zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode
- }
- }
-
-
- /**
* This is the method to stop the polling process
*
* @return if the stopping process is successful return true else false
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 6db4550..9cb544c 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -20,6 +20,9 @@
*/
package org.apache.airavata.gfac.monitor.util;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -30,12 +33,22 @@ import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
public class CommonUtils {
private final static Logger logger = LoggerFactory.getLogger(CommonUtils.class);
@@ -204,4 +217,113 @@ public class CommonUtils {
}
}
}
+
+ /**
+ * Update job count for a given set of paths.
+ * @param zk - zookeeper instance
+ * @param changeCountMap - map of change job count with relevant path
+ * @param isAdd - Should add or reduce existing job count by the given job count.
+ */
+ public static void updateZkWithJobCount(ZooKeeper zk, final Map<String, Integer> changeCountMap, boolean isAdd) {
+ StringBuilder changeZNodePaths = new StringBuilder();
+ try {
+ if (zk == null || !zk.getState().isConnected()) {
+ try {
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await();
+ } catch (ApplicationSettingsException e) {
+ logger.error("Error while reading zookeeper hostport string");
+ } catch (IOException e) {
+ logger.error("Error while reconnect attempt to zookeeper where zookeeper connection loss state");
+ }
+ }
+
+ for (String path : changeCountMap.keySet()) {
+ if (isAdd) {
+ CommonUtils.checkAndCreateZNode(zk, path);
+ }
+ byte[] byteData = zk.getData(path, null, null);
+ String nodeData;
+ if (byteData == null) {
+ if (isAdd) {
+ zk.setData(path, String.valueOf(changeCountMap.get(path)).getBytes(), -1);
+ } else {
+ // This is not possible, but we handle in case there any data zookeeper communication failure
+ logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0");
+ zk.setData(path, "0".getBytes(), -1);
+ }
+ } else {
+ nodeData = new String(byteData);
+ if (isAdd) {
+ zk.setData(path, String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes(), -1);
+ } else {
+ int previousCount = Integer.parseInt(nodeData);
+ int removeCount = changeCountMap.get(path);
+ if (previousCount >= removeCount) {
+ zk.setData(path, String.valueOf(previousCount - removeCount).getBytes(), -1);
+ } else {
+ // This is not possible, do we need to reset the job count to 0 ?
+ logger.error("Requested remove job count is " + removeCount +
+ " which is higher than the existing job count " + previousCount
+ + " in " + path + " path.");
+ }
+ }
+ }
+ changeZNodePaths.append(path).append(":");
+ }
+
+ // update stat node to trigger orchestrator watchers
+ if (changeCountMap.size() > 0) {
+ changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
+ zk.setData("/" + Constants.STAT, changeZNodePaths.toString().getBytes(), -1);
+ }
+ } catch (KeeperException e) {
+ logger.error("Error while writing job count to zookeeper", e);
+ } catch (InterruptedException e) {
+ logger.error("Error while writing job count to zookeeper", e);
+ }
+
+ }
+
+ /**
+ * Increase job count by one and update the zookeeper
+ * @param monitorID - Job monitorId
+ */
+ public static void increaseZkJobCount(MonitorID monitorID) {
+ Map<String, Integer> addMap = new HashMap<String, Integer>();
+ addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
+ updateZkWithJobCount(monitorID.getJobExecutionContext().getZk(), addMap, true);
+ }
+
+ /**
+ * Construct and return the path for a given MonitorID , eg: /stat/{username}/{resourceName}/job
+ * @param monitorID - Job monitorId
+ * @return
+ */
+ public static String getJobCountUpdatePath(MonitorID monitorID){
+ return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName())
+ .append("/").append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB).toString();
+ }
+
+ /**
+ * Check whether znode is exist in given path if not create a new znode
+ * @param zk - zookeeper instance
+ * @param path - path to check znode
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private static void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException {
+ if (zk.exists(path, null) == null) { // if znode doesn't exist
+ if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist
+ checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
+ }
+ zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode
+ }
+ }
}
[2/2] git commit: Merge branch 'master' into StoreGFacsJobCount
Posted by sh...@apache.org.
Merge branch 'master' into StoreGFacsJobCount
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d1d8759f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d1d8759f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d1d8759f
Branch: refs/heads/master
Commit: d1d8759fd81de460752bfebf845e8293c3b3b04c
Parents: 9c23aa8 2207ece
Author: shamrath <sh...@gmail.com>
Authored: Fri Sep 12 17:25:33 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Fri Sep 12 17:25:33 2014 -0400
----------------------------------------------------------------------
modules/configuration/server/src/main/resources/PBSTemplate.xslt | 2 +-
modules/configuration/server/src/main/resources/SLURMTemplate.xslt | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------