You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2014/09/12 04:43:42 UTC
[1/3] git commit: GFac pull monitor Write job count to zookeeper
Repository: airavata
Updated Branches:
refs/heads/master 68e81ef80 -> 45f0d68fd
GFac pull monitor Write job count to zookeeper
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/78ad2ef5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/78ad2ef5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/78ad2ef5
Branch: refs/heads/master
Commit: 78ad2ef58b04051bc5c41791b7c03f90da914e5c
Parents: 92ad9f1
Author: shamrath <sh...@gmail.com>
Authored: Thu Sep 11 21:35:41 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Thu Sep 11 21:35:41 2014 -0400
----------------------------------------------------------------------
.../apache/airavata/common/utils/Constants.java | 2 +
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 109 ++++++++++++++++++-
2 files changed, 110 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/78ad2ef5/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index b8f999a..8335e0c 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -46,4 +46,6 @@ public final class Constants {
public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name";
public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name";
public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
+ public static final String STAT = "stat";
+ public static final String JOB = "job";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/78ad2ef5/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 93e1aa9..3742179 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
@@ -28,9 +29,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -48,10 +53,15 @@ import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.JobStatus;
import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -242,6 +252,7 @@ public class HPCPullMonitor extends PullMonitor {
for (MonitorID completedJob : completedJobs) {
CommonUtils.removeMonitorFromQueue(queue, completedJob);
}
+ updateZkWithJobCount(take , completedJobs);
} catch (InterruptedException e) {
if (!this.queue.contains(take)) {
try {
@@ -304,6 +315,102 @@ public class HPCPullMonitor extends PullMonitor {
return true;
}
+ /**
+ * Build the /stat/{username}/{hostAddress}/job znode path and store job count
+ * @param userMonitorData
+ * @param completedJobs
+ * @throws ApplicationSettingsException
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private void updateZkWithJobCount(UserMonitorData userMonitorData, List<MonitorID> completedJobs) {
+ try {
+ final CountDownLatch latch = new CountDownLatch(1);
+ ZooKeeper zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.SyncConnected) {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await();
+
+ try {
+ List<String> updatedPathList = new ArrayList<String>();
+ String pathToUserName = new StringBuilder("/").append(Constants.STAT)
+ .append("/").append(userMonitorData.getUserName()).toString();
+ StringBuilder jobPathBuilder;
+ for (HostMonitorData hostData : userMonitorData.getHostMonitorData()) {
+ jobPathBuilder = new StringBuilder(pathToUserName).append("/")
+ .append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
+ checkAndCreateZNode(zk, jobPathBuilder.toString());
+ int jobCount = 0;
+ List<MonitorID> idList = hostData.getMonitorIDs();
+ if (idList != null) {
+ jobCount = idList.size();
+ // removed already updated jobs from complete jobs
+ for (MonitorID monitorID : idList) {
+ if (completedJobs.contains(monitorID)) {
+ completedJobs.remove(monitorID);
+ }
+ }
+ }
+ zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1);
+ updatedPathList.add(jobPathBuilder.toString());
+ }
+
+ //handle completed jobs
+ /* If all jobs are completed in a host then monitor queue remove such hosts from monitoring ,but we need
+ to update those host's stat with JobCount 0 */
+ for (MonitorID monitorID : completedJobs) {
+ jobPathBuilder = new StringBuilder(pathToUserName).append("/")
+ .append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
+ zk.setData(jobPathBuilder.toString(), "0".getBytes(), -1);
+ updatedPathList.add(jobPathBuilder.toString());
+ }
+ // trigger orchestrator watcher by saving the updated list to zookeeper
+ if (updatedPathList.size() > 0) {
+ StringBuilder strBuilder = new StringBuilder();
+ for (String updatedPath : updatedPathList) {
+ strBuilder.append(updatedPath).append(":");
+ }
+ strBuilder.deleteCharAt(strBuilder.length() - 1);
+ zk.setData(("/" + Constants.STAT), strBuilder.toString().getBytes(), -1);
+ }
+ zk.close();
+ } catch (KeeperException e) {
+ logger.error("Error while storing job count to zookeeper", e);
+ } catch (InterruptedException e) {
+ logger.error("Error while storing job count to zookeeper", e);
+ }
+ } catch (IOException e) {
+ logger.error("Error while connecting to the zookeeper server", e);
+ } catch (ApplicationSettingsException e) {
+ logger.error("Error while getting zookeeper hostport property", e);
+ } catch (InterruptedException e) {
+ logger.error("Error while waiting for SyncConnected message" , e);
+ }
+
+ }
+
+ /**
+ * Check whether znode is exist in given path if not create a new znode
+ * @param zk - zookeeper instance
+ * @param path - path to check znode
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException {
+ if (zk.exists(path, null) == null) { // if znode doesn't exist
+ if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist
+ checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
+ }
+ zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode
+ }
+ }
+
/**
* This is the method to stop the polling process
[2/3] git commit: Merge branch 'master' into StoreGFacsJobCount
Posted by sh...@apache.org.
Merge branch 'master' into StoreGFacsJobCount
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d004df14
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d004df14
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d004df14
Branch: refs/heads/master
Commit: d004df145c9e9be964c5028fca7744e7635fc4eb
Parents: 78ad2ef 68e81ef
Author: shamrath <sh...@gmail.com>
Authored: Thu Sep 11 21:36:59 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Thu Sep 11 21:36:59 2014 -0400
----------------------------------------------------------------------
.../airavata/common/utils/ServerSettings.java | 202 ++++++++++---------
.../server/src/main/resources/PBSTemplate.xslt | 2 +-
.../src/main/resources/SLURMTemplate.xslt | 1 +
.../main/resources/airavata-server.properties | 8 +-
.../airavata/gfac/core/monitor/MonitorID.java | 4 +-
.../gfac/gsissh/util/GFACGSISSHUtils.java | 3 +
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 40 +++-
.../impl/push/amqp/SimpleJobFinishConsumer.java | 76 +++++++
.../airavata/gfac/monitor/util/CommonUtils.java | 6 +-
.../airavata/gfac/ssh/util/GFACSSHUtils.java | 3 +
.../airavata/gsi/ssh/api/job/JobDescriptor.java | 7 +
.../gsi/ssh/api/job/SlurmOutputParser.java | 4 +-
.../gsi/ssh/impl/StandardOutReader.java | 4 +-
.../gsi/ssh/impl/SystemCommandOutput.java | 1 +
.../main/resources/schemas/PBSJobDescriptor.xsd | 1 +
15 files changed, 252 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/d004df14/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
[3/3] git commit: Optimized zookeeper update code,
only update if job count is differ from exist job count
Posted by sh...@apache.org.
Optimized zookeeper update code, only update if job count is differ from exist job count
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/45f0d68f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/45f0d68f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/45f0d68f
Branch: refs/heads/master
Commit: 45f0d68fd3da85f692a899cc371b3247267cac74
Parents: d004df1
Author: shamrath <sh...@gmail.com>
Authored: Thu Sep 11 22:02:56 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Thu Sep 11 22:02:56 2014 -0400
----------------------------------------------------------------------
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 22 ++++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/45f0d68f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 6d6108e..d7c8a4e 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -343,6 +343,7 @@ public class HPCPullMonitor extends PullMonitor {
/**
* Build the /stat/{username}/{hostAddress}/job znode path and store job count
+ *
* @param userMonitorData
* @param completedJobs
* @throws ApplicationSettingsException
@@ -373,9 +374,20 @@ public class HPCPullMonitor extends PullMonitor {
.append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
checkAndCreateZNode(zk, jobPathBuilder.toString());
int jobCount = 0;
+ String jobCountStr = new String(zk.getData(jobPathBuilder.toString(), null, null));
+ try {
+ jobCount = Integer.parseInt(jobCountStr);
+ } catch (NumberFormatException e) {
+ // do nothing , keep jobCount 0
+ }
List<MonitorID> idList = hostData.getMonitorIDs();
+ boolean updatePath = true;
if (idList != null) {
- jobCount = idList.size();
+ if (jobCount == idList.size()) {
+ updatePath = false;
+ } else {
+ jobCount = idList.size();
+ }
// removed already updated jobs from complete jobs
for (MonitorID monitorID : idList) {
if (completedJobs.contains(monitorID)) {
@@ -383,8 +395,10 @@ public class HPCPullMonitor extends PullMonitor {
}
}
}
- zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1);
- updatedPathList.add(jobPathBuilder.toString());
+ if (updatePath) {
+ zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1);
+ updatedPathList.add(jobPathBuilder.toString());
+ }
}
//handle completed jobs
@@ -416,7 +430,7 @@ public class HPCPullMonitor extends PullMonitor {
} catch (ApplicationSettingsException e) {
logger.error("Error while getting zookeeper hostport property", e);
} catch (InterruptedException e) {
- logger.error("Error while waiting for SyncConnected message" , e);
+ logger.error("Error while waiting for SyncConnected message", e);
}
}