You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2014/09/12 04:43:42 UTC

[1/3] git commit: GFac pull monitor Write job count to zookeeper

Repository: airavata
Updated Branches:
  refs/heads/master 68e81ef80 -> 45f0d68fd


GFac pull monitor Write job count to zookeeper


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/78ad2ef5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/78ad2ef5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/78ad2ef5

Branch: refs/heads/master
Commit: 78ad2ef58b04051bc5c41791b7c03f90da914e5c
Parents: 92ad9f1
Author: shamrath <sh...@gmail.com>
Authored: Thu Sep 11 21:35:41 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Thu Sep 11 21:35:41 2014 -0400

----------------------------------------------------------------------
 .../apache/airavata/common/utils/Constants.java |   2 +
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 109 ++++++++++++++++++-
 2 files changed, 110 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/78ad2ef5/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index b8f999a..8335e0c 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -46,4 +46,6 @@ public final class Constants {
     public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name";
     public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name";
     public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
+    public static final String STAT = "stat";
+    public static final String JOB = "job";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/78ad2ef5/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 93e1aa9..3742179 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.gfac.monitor.impl.pull.qstat;
 
+import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Date;
@@ -28,9 +29,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -48,10 +53,15 @@ import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.JobStatus;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -242,6 +252,7 @@ public class HPCPullMonitor extends PullMonitor {
             for (MonitorID completedJob : completedJobs) {
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
             }
+            updateZkWithJobCount(take , completedJobs);
         } catch (InterruptedException e) {
             if (!this.queue.contains(take)) {
                 try {
@@ -304,6 +315,102 @@ public class HPCPullMonitor extends PullMonitor {
         return true;
     }
 
+    /**
+     * Build the /stat/{username}/{hostAddress}/job znode path and store job count
+     * @param userMonitorData
+     * @param completedJobs
+     * @throws ApplicationSettingsException
+     * @throws IOException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private void updateZkWithJobCount(UserMonitorData userMonitorData, List<MonitorID> completedJobs) {
+        try {
+            final CountDownLatch latch = new CountDownLatch(1);
+            ZooKeeper zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                    if (event.getState() == Event.KeeperState.SyncConnected) {
+                        latch.countDown();
+                    }
+                }
+            });
+            latch.await();
+
+            try {
+                List<String> updatedPathList = new ArrayList<String>();
+                String pathToUserName = new StringBuilder("/").append(Constants.STAT)
+                        .append("/").append(userMonitorData.getUserName()).toString();
+                StringBuilder jobPathBuilder;
+                for (HostMonitorData hostData : userMonitorData.getHostMonitorData()) {
+                    jobPathBuilder = new StringBuilder(pathToUserName).append("/")
+                            .append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
+                    checkAndCreateZNode(zk, jobPathBuilder.toString());
+                    int jobCount = 0;
+                    List<MonitorID> idList = hostData.getMonitorIDs();
+                    if (idList != null) {
+                        jobCount = idList.size();
+                        // removed already updated jobs from complete jobs
+                        for (MonitorID monitorID : idList) {
+                            if (completedJobs.contains(monitorID)) {
+                                completedJobs.remove(monitorID);
+                            }
+                        }
+                    }
+                    zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1);
+                    updatedPathList.add(jobPathBuilder.toString());
+                }
+
+                //handle completed jobs
+                /* If all jobs are completed in a host then monitor queue remove such hosts from monitoring ,but we need
+                     to update those host's stat with JobCount 0 */
+                for (MonitorID monitorID : completedJobs) {
+                    jobPathBuilder = new StringBuilder(pathToUserName).append("/")
+                            .append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
+                    zk.setData(jobPathBuilder.toString(), "0".getBytes(), -1);
+                    updatedPathList.add(jobPathBuilder.toString());
+                }
+                // trigger orchestrator watcher by saving the updated list to zookeeper
+                if (updatedPathList.size() > 0) {
+                    StringBuilder strBuilder = new StringBuilder();
+                    for (String updatedPath : updatedPathList) {
+                        strBuilder.append(updatedPath).append(":");
+                    }
+                    strBuilder.deleteCharAt(strBuilder.length() - 1);
+                    zk.setData(("/" + Constants.STAT), strBuilder.toString().getBytes(), -1);
+                }
+                zk.close();
+            } catch (KeeperException e) {
+                logger.error("Error while storing job count to zookeeper", e);
+            } catch (InterruptedException e) {
+                logger.error("Error while storing job count to zookeeper", e);
+            }
+        } catch (IOException e) {
+            logger.error("Error while connecting to the zookeeper server", e);
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error while getting zookeeper hostport property", e);
+        } catch (InterruptedException e) {
+            logger.error("Error while waiting for SyncConnected message" , e);
+        }
+
+    }
+
+    /**
+     * Check whether znode is exist in given path if not create a new znode
+     * @param zk - zookeeper instance
+     * @param path - path to check znode
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException {
+        if (zk.exists(path, null) == null) { // if znode doesn't exist
+            if (path.lastIndexOf("/") > 1) {  // recursively traverse to parent znode and check parent exist
+                checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
+            }
+            zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode
+        }
+    }
+
 
     /**
      * This is the method to stop the polling process


[2/3] git commit: Merge branch 'master' into StoreGFacsJobCount

Posted by sh...@apache.org.
Merge branch 'master' into StoreGFacsJobCount


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d004df14
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d004df14
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d004df14

Branch: refs/heads/master
Commit: d004df145c9e9be964c5028fca7744e7635fc4eb
Parents: 78ad2ef 68e81ef
Author: shamrath <sh...@gmail.com>
Authored: Thu Sep 11 21:36:59 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Thu Sep 11 21:36:59 2014 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   | 202 ++++++++++---------
 .../server/src/main/resources/PBSTemplate.xslt  |   2 +-
 .../src/main/resources/SLURMTemplate.xslt       |   1 +
 .../main/resources/airavata-server.properties   |   8 +-
 .../airavata/gfac/core/monitor/MonitorID.java   |   4 +-
 .../gfac/gsissh/util/GFACGSISSHUtils.java       |   3 +
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |  40 +++-
 .../impl/push/amqp/SimpleJobFinishConsumer.java |  76 +++++++
 .../airavata/gfac/monitor/util/CommonUtils.java |   6 +-
 .../airavata/gfac/ssh/util/GFACSSHUtils.java    |   3 +
 .../airavata/gsi/ssh/api/job/JobDescriptor.java |   7 +
 .../gsi/ssh/api/job/SlurmOutputParser.java      |   4 +-
 .../gsi/ssh/impl/StandardOutReader.java         |   4 +-
 .../gsi/ssh/impl/SystemCommandOutput.java       |   1 +
 .../main/resources/schemas/PBSJobDescriptor.xsd |   1 +
 15 files changed, 252 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/d004df14/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------


[3/3] git commit: Optimized zookeeper update code, only update if job count is differ from exist job count

Posted by sh...@apache.org.
Optimized zookeeper update code, only update if job count is differ from exist job count


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/45f0d68f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/45f0d68f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/45f0d68f

Branch: refs/heads/master
Commit: 45f0d68fd3da85f692a899cc371b3247267cac74
Parents: d004df1
Author: shamrath <sh...@gmail.com>
Authored: Thu Sep 11 22:02:56 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Thu Sep 11 22:02:56 2014 -0400

----------------------------------------------------------------------
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 22 ++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/45f0d68f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 6d6108e..d7c8a4e 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -343,6 +343,7 @@ public class HPCPullMonitor extends PullMonitor {
 
     /**
      * Build the /stat/{username}/{hostAddress}/job znode path and store job count
+     *
      * @param userMonitorData
      * @param completedJobs
      * @throws ApplicationSettingsException
@@ -373,9 +374,20 @@ public class HPCPullMonitor extends PullMonitor {
                             .append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
                     checkAndCreateZNode(zk, jobPathBuilder.toString());
                     int jobCount = 0;
+                    String jobCountStr = new String(zk.getData(jobPathBuilder.toString(), null, null));
+                    try {
+                        jobCount = Integer.parseInt(jobCountStr);
+                    } catch (NumberFormatException e) {
+                        // do nothing , keep jobCount 0
+                    }
                     List<MonitorID> idList = hostData.getMonitorIDs();
+                    boolean updatePath = true;
                     if (idList != null) {
-                        jobCount = idList.size();
+                        if (jobCount == idList.size()) {
+                            updatePath = false;
+                        } else {
+                            jobCount = idList.size();
+                        }
                         // removed already updated jobs from complete jobs
                         for (MonitorID monitorID : idList) {
                             if (completedJobs.contains(monitorID)) {
@@ -383,8 +395,10 @@ public class HPCPullMonitor extends PullMonitor {
                             }
                         }
                     }
-                    zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1);
-                    updatedPathList.add(jobPathBuilder.toString());
+                    if (updatePath) {
+                        zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(), -1);
+                        updatedPathList.add(jobPathBuilder.toString());
+                    }
                 }
 
                 //handle completed jobs
@@ -416,7 +430,7 @@ public class HPCPullMonitor extends PullMonitor {
         } catch (ApplicationSettingsException e) {
             logger.error("Error while getting zookeeper hostport property", e);
         } catch (InterruptedException e) {
-            logger.error("Error while waiting for SyncConnected message" , e);
+            logger.error("Error while waiting for SyncConnected message", e);
         }
 
     }