You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/18 08:24:51 UTC

git commit: fixing synchronization issue with monitoring queue and zk reconnection issues

Repository: airavata
Updated Branches:
  refs/heads/master c65389506 -> 9dac178fb


fixing synchronization issue with monitoring queue and zk reconnection issues


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9dac178f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9dac178f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9dac178f

Branch: refs/heads/master
Commit: 9dac178fbbe6f6cbad115da0edb2620f936a9410
Parents: c653895
Author: lahiru <la...@apache.org>
Authored: Thu Sep 18 02:24:45 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Thu Sep 18 02:24:45 2014 -0400

----------------------------------------------------------------------
 .../org/apache/airavata/gfac/Constants.java     |   3 +
 .../airavata/gfac/core/monitor/MonitorID.java   |   3 +
 .../gfac/gsissh/util/GFACGSISSHUtils.java       |   3 +-
 .../airavata/gfac/monitor/HPCMonitorID.java     |  28 +++++-
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |   6 +-
 .../airavata/gfac/monitor/util/CommonUtils.java | 100 ++++++++++---------
 .../airavata/gfac/ssh/util/GFACSSHUtils.java    |   4 +-
 .../server/OrchestratorServerHandler.java       |   1 +
 .../apache/airavata/gsi/ssh/api/Cluster.java    |   6 ++
 9 files changed, 100 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
index b9ecdbe..69d4a4a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
@@ -73,4 +73,7 @@ public class Constants {
     public static final String PROPERTY = "property";
     public static final String NAME = "name";
     public static final String VALUE = "value";
+
+    public static final String SSH_SECURITY_CONTEXT = "ssh";
+    public static final String GSI_SECURITY_CONTEXT = "gsi";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 5344232..eb30e31 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -21,6 +21,9 @@
 package org.apache.airavata.gfac.core.monitor;
 
 import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index 9e45986..dcdbb4d 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -32,6 +32,7 @@ import org.apache.airavata.common.utils.StringUtil;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.RequestData;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -105,7 +106,7 @@ public class GFACGSISSHUtils {
             } catch (Exception e) {
                 throw new GFacException("An error occurred while creating GSI security context", e);
             }
-            jobExecutionContext.addSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT, context);
+            jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
index 0ced88a..a4a131d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
@@ -20,8 +20,15 @@ package org.apache.airavata.gfac.monitor;/*
 */
 
 import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -54,8 +61,25 @@ public class HPCMonitorID extends MonitorID {
         super(jobExecutionContext);
         this.authenticationInfo = authenticationInfo;
         if (this.authenticationInfo != null) {
-            if (this.authenticationInfo instanceof MyProxyAuthenticationInfo) {
-                setUserName(((MyProxyAuthenticationInfo) this.authenticationInfo).getUserName());
+            try {
+                SecurityContext securityContext = jobExecutionContext.getSecurityContext(Constants.GSI_SECURITY_CONTEXT);
+                ServerInfo serverInfo = null;
+                if (securityContext != null) {
+                    serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo();
+                }
+                if(serverInfo == null) {
+                    securityContext = jobExecutionContext.getSecurityContext(Constants.SSH_SECURITY_CONTEXT);
+                    if(securityContext !=null){
+                        serverInfo = (((SSHSecurityContext) securityContext).getPbsCluster()).getServerInfo();
+                    }
+                }
+                if(serverInfo!=null) {
+                    if (serverInfo.getUserName() != null) {
+                        setUserName(serverInfo.getUserName());
+                    }
+                }
+            } catch (GFacException e) {
+                e.printStackTrace();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index dac9499..1ed3c5a 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -118,7 +118,11 @@ public class HPCPullMonitor extends PullMonitor {
         this.startPulling = true;
         while (this.startPulling && !ServerSettings.isStopAllThreads()) {
             try {
-                startPulling();
+                if (this.queue.size() > 0) {
+                    synchronized (this.queue) {
+                        startPulling();
+                    }
+                }
                 // After finishing one iteration of the full queue this thread sleeps 1 second
                 Thread.sleep(10000);
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 9cb544c..fb4d898 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -95,36 +95,38 @@ public class CommonUtils {
     }
 
     public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID) throws AiravataMonitorException {
-        Iterator<UserMonitorData> iterator = queue.iterator();
-        while (iterator.hasNext()) {
-            UserMonitorData next = iterator.next();
-            if (next.getUserName().equals(monitorID.getUserName())) {
-                // then this is the right place to update
-                List<HostMonitorData> monitorIDs = next.getHostMonitorData();
-                for (HostMonitorData host : monitorIDs) {
-                    if (host.getHost().toXML().equals(monitorID.getHost().toXML())) {
-                        // ok we found right place to add this monitorID
-                        host.addMonitorIDForHost(monitorID);
-                        return;
+        synchronized (queue) {
+            Iterator<UserMonitorData> iterator = queue.iterator();
+            while (iterator.hasNext()) {
+                UserMonitorData next = iterator.next();
+                if (next.getUserName().equals(monitorID.getUserName())) {
+                    // then this is the right place to update
+                    List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+                    for (HostMonitorData host : monitorIDs) {
+                        if (host.getHost().toXML().equals(monitorID.getHost().toXML())) {
+                            // ok we found right place to add this monitorID
+                            host.addMonitorIDForHost(monitorID);
+                            return;
+                        }
                     }
+                    // there is a userMonitor object for this user name but no Hosts for this host
+                    // so we have to create new Hosts
+                    HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+                    hostMonitorData.addMonitorIDForHost(monitorID);
+                    next.addHostMonitorData(hostMonitorData);
+                    return;
                 }
-                // there is a userMonitor object for this user name but no Hosts for this host
-                // so we have to create new Hosts
-                HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
-                hostMonitorData.addMonitorIDForHost(monitorID);
-                next.addHostMonitorData(hostMonitorData);
-                return;
             }
-        }
-        HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
-        hostMonitorData.addMonitorIDForHost(monitorID);
+            HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+            hostMonitorData.addMonitorIDForHost(monitorID);
 
-        UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
-        userMonitorData.addHostMonitorData(hostMonitorData);
-        try {
-            queue.put(userMonitorData);
-        } catch (InterruptedException e) {
-            throw new AiravataMonitorException(e);
+            UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
+            userMonitorData.addHostMonitorData(hostMonitorData);
+            try {
+                queue.put(userMonitorData);
+            } catch (InterruptedException e) {
+                throw new AiravataMonitorException(e);
+            }
         }
     }
     public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){
@@ -138,30 +140,32 @@ public class CommonUtils {
         return true;
     }
     public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue,MonitorID monitorID) throws AiravataMonitorException {
-        Iterator<UserMonitorData> iterator = queue.iterator();
-        while(iterator.hasNext()){
-            UserMonitorData next = iterator.next();
-            if(next.getUserName().equals(monitorID.getUserName())){
-                // then this is the right place to update
-                List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
-                for(HostMonitorData iHostMonitorID:hostMonitorData){
-                    if(iHostMonitorID.getHost().toXML().equals(monitorID.getHost().toXML())) {
-                        List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
-                        for(MonitorID iMonitorID:monitorIDs){
-                            if(iMonitorID.getJobID().equals(monitorID.getJobID())
-                                    || iMonitorID.getJobName().equals(monitorID.getJobName())) {
-                                // OK we found the object, we cannot do list.remove(object) states of two objects
-                                // could be different, thats why we check the jobID
-                                logger.info("Removing the job:"+ monitorID.getJobID()+" from monitoring last status:" + monitorID.getStatus().toString());
-                                monitorIDs.remove(iMonitorID);
-                                if(monitorIDs.size()==0) {
-                                    hostMonitorData.remove(iHostMonitorID);
-                                    if (hostMonitorData.size() == 0) {
-                                        // no useful data so we have to remove the element from the queue
-                                        queue.remove(next);
+        synchronized (queue) {
+            Iterator<UserMonitorData> iterator = queue.iterator();
+            while (iterator.hasNext()) {
+                UserMonitorData next = iterator.next();
+                if (next.getUserName().equals(monitorID.getUserName())) {
+                    // then this is the right place to update
+                    List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
+                    for (HostMonitorData iHostMonitorID : hostMonitorData) {
+                        if (iHostMonitorID.getHost().toXML().equals(monitorID.getHost().toXML())) {
+                            List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
+                            for (MonitorID iMonitorID : monitorIDs) {
+                                if (iMonitorID.getJobID().equals(monitorID.getJobID())
+                                        || iMonitorID.getJobName().equals(monitorID.getJobName())) {
+                                    // OK we found the object, we cannot do list.remove(object) states of two objects
+                                    // could be different, thats why we check the jobID
+                                    logger.info("Removing the job:" + monitorID.getJobID() + " from monitoring last status:" + monitorID.getStatus().toString());
+                                    monitorIDs.remove(iMonitorID);
+                                    if (monitorIDs.size() == 0) {
+                                        hostMonitorData.remove(iHostMonitorID);
+                                        if (hostMonitorData.size() == 0) {
+                                            // no useful data so we have to remove the element from the queue
+                                            queue.remove(next);
+                                        }
                                     }
+                                    return;
                                 }
-                                return;
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 75bd724..d5dd97b 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -79,7 +79,7 @@ public class GFACSSHUtils {
                 }
                 SSHCredential credentials = tokenizedSSHAuthInfo.getCredentials();// this is just a call to get and set credentials in to this object,data will be used
                 serverInfo.setUserName(credentials.getPortalUserName());
-
+                jobExecutionContext.getExperiment().setUserName(credentials.getPortalUserName());
                 // inside the pbsCluser object
                 pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo,
                         CommonUtils.getPBSJobManager(installedParentPath));
@@ -87,7 +87,7 @@ public class GFACSSHUtils {
                 e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
             }
             sshSecurityContext.setPbsCluster(pbsCluster);
-            jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, sshSecurityContext);
+            jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT, sshSecurityContext);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index e6678d3..6d9f259 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -320,6 +320,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
                         } catch (KeeperException e) {
                             e.printStackTrace();
                         }
+                        break;
                 }
 				if (watchedEvent.getPath() != null
 						&& watchedEvent.getPath().startsWith(

http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
index b3ecc16..ed4b3b4 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
@@ -145,4 +145,10 @@ public interface Cluster {
      */
     public void disconnect() throws SSHApiException;
 
+    /**
+     * This gives the server Info
+     * @return
+     */
+    public ServerInfo getServerInfo();
+
 }