You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/18 08:24:51 UTC
git commit: fixing synchronization issue with monitoring queue and zk
reconnection issues
Repository: airavata
Updated Branches:
refs/heads/master c65389506 -> 9dac178fb
fixing synchronization issue with monitoring queue and zk reconnection issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9dac178f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9dac178f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9dac178f
Branch: refs/heads/master
Commit: 9dac178fbbe6f6cbad115da0edb2620f936a9410
Parents: c653895
Author: lahiru <la...@apache.org>
Authored: Thu Sep 18 02:24:45 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Thu Sep 18 02:24:45 2014 -0400
----------------------------------------------------------------------
.../org/apache/airavata/gfac/Constants.java | 3 +
.../airavata/gfac/core/monitor/MonitorID.java | 3 +
.../gfac/gsissh/util/GFACGSISSHUtils.java | 3 +-
.../airavata/gfac/monitor/HPCMonitorID.java | 28 +++++-
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 6 +-
.../airavata/gfac/monitor/util/CommonUtils.java | 100 ++++++++++---------
.../airavata/gfac/ssh/util/GFACSSHUtils.java | 4 +-
.../server/OrchestratorServerHandler.java | 1 +
.../apache/airavata/gsi/ssh/api/Cluster.java | 6 ++
9 files changed, 100 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
index b9ecdbe..69d4a4a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
@@ -73,4 +73,7 @@ public class Constants {
public static final String PROPERTY = "property";
public static final String NAME = "name";
public static final String VALUE = "value";
+
+ public static final String SSH_SECURITY_CONTEXT = "ssh";
+ public static final String GSI_SECURITY_CONTEXT = "gsi";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 5344232..eb30e31 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -21,6 +21,9 @@
package org.apache.airavata.gfac.core.monitor;
import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index 9e45986..dcdbb4d 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -32,6 +32,7 @@ import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.RequestData;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -105,7 +106,7 @@ public class GFACGSISSHUtils {
} catch (Exception e) {
throw new GFacException("An error occurred while creating GSI security context", e);
}
- jobExecutionContext.addSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT, context);
+ jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
index 0ced88a..a4a131d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
@@ -20,8 +20,15 @@ package org.apache.airavata.gfac.monitor;/*
*/
import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.apache.airavata.model.workspace.experiment.JobState;
@@ -54,8 +61,25 @@ public class HPCMonitorID extends MonitorID {
super(jobExecutionContext);
this.authenticationInfo = authenticationInfo;
if (this.authenticationInfo != null) {
- if (this.authenticationInfo instanceof MyProxyAuthenticationInfo) {
- setUserName(((MyProxyAuthenticationInfo) this.authenticationInfo).getUserName());
+ try {
+ SecurityContext securityContext = jobExecutionContext.getSecurityContext(Constants.GSI_SECURITY_CONTEXT);
+ ServerInfo serverInfo = null;
+ if (securityContext != null) {
+ serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo();
+ }
+ if(serverInfo == null) {
+ securityContext = jobExecutionContext.getSecurityContext(Constants.SSH_SECURITY_CONTEXT);
+ if(securityContext !=null){
+ serverInfo = (((SSHSecurityContext) securityContext).getPbsCluster()).getServerInfo();
+ }
+ }
+ if(serverInfo!=null) {
+ if (serverInfo.getUserName() != null) {
+ setUserName(serverInfo.getUserName());
+ }
+ }
+ } catch (GFacException e) {
+ e.printStackTrace();
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index dac9499..1ed3c5a 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -118,7 +118,11 @@ public class HPCPullMonitor extends PullMonitor {
this.startPulling = true;
while (this.startPulling && !ServerSettings.isStopAllThreads()) {
try {
- startPulling();
+ if (this.queue.size() > 0) {
+ synchronized (this.queue) {
+ startPulling();
+ }
+ }
// After finishing one iteration of the full queue this thread sleeps 1 second
Thread.sleep(10000);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 9cb544c..fb4d898 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -95,36 +95,38 @@ public class CommonUtils {
}
public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID) throws AiravataMonitorException {
- Iterator<UserMonitorData> iterator = queue.iterator();
- while (iterator.hasNext()) {
- UserMonitorData next = iterator.next();
- if (next.getUserName().equals(monitorID.getUserName())) {
- // then this is the right place to update
- List<HostMonitorData> monitorIDs = next.getHostMonitorData();
- for (HostMonitorData host : monitorIDs) {
- if (host.getHost().toXML().equals(monitorID.getHost().toXML())) {
- // ok we found right place to add this monitorID
- host.addMonitorIDForHost(monitorID);
- return;
+ synchronized (queue) {
+ Iterator<UserMonitorData> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ UserMonitorData next = iterator.next();
+ if (next.getUserName().equals(monitorID.getUserName())) {
+ // then this is the right place to update
+ List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+ for (HostMonitorData host : monitorIDs) {
+ if (host.getHost().toXML().equals(monitorID.getHost().toXML())) {
+ // ok we found right place to add this monitorID
+ host.addMonitorIDForHost(monitorID);
+ return;
+ }
}
+ // there is a userMonitor object for this user name but no Hosts for this host
+ // so we have to create new Hosts
+ HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+ hostMonitorData.addMonitorIDForHost(monitorID);
+ next.addHostMonitorData(hostMonitorData);
+ return;
}
- // there is a userMonitor object for this user name but no Hosts for this host
- // so we have to create new Hosts
- HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
- hostMonitorData.addMonitorIDForHost(monitorID);
- next.addHostMonitorData(hostMonitorData);
- return;
}
- }
- HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
- hostMonitorData.addMonitorIDForHost(monitorID);
+ HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+ hostMonitorData.addMonitorIDForHost(monitorID);
- UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
- userMonitorData.addHostMonitorData(hostMonitorData);
- try {
- queue.put(userMonitorData);
- } catch (InterruptedException e) {
- throw new AiravataMonitorException(e);
+ UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
+ userMonitorData.addHostMonitorData(hostMonitorData);
+ try {
+ queue.put(userMonitorData);
+ } catch (InterruptedException e) {
+ throw new AiravataMonitorException(e);
+ }
}
}
public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){
@@ -138,30 +140,32 @@ public class CommonUtils {
return true;
}
public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue,MonitorID monitorID) throws AiravataMonitorException {
- Iterator<UserMonitorData> iterator = queue.iterator();
- while(iterator.hasNext()){
- UserMonitorData next = iterator.next();
- if(next.getUserName().equals(monitorID.getUserName())){
- // then this is the right place to update
- List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
- for(HostMonitorData iHostMonitorID:hostMonitorData){
- if(iHostMonitorID.getHost().toXML().equals(monitorID.getHost().toXML())) {
- List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
- for(MonitorID iMonitorID:monitorIDs){
- if(iMonitorID.getJobID().equals(monitorID.getJobID())
- || iMonitorID.getJobName().equals(monitorID.getJobName())) {
- // OK we found the object, we cannot do list.remove(object) states of two objects
- // could be different, thats why we check the jobID
- logger.info("Removing the job:"+ monitorID.getJobID()+" from monitoring last status:" + monitorID.getStatus().toString());
- monitorIDs.remove(iMonitorID);
- if(monitorIDs.size()==0) {
- hostMonitorData.remove(iHostMonitorID);
- if (hostMonitorData.size() == 0) {
- // no useful data so we have to remove the element from the queue
- queue.remove(next);
+ synchronized (queue) {
+ Iterator<UserMonitorData> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ UserMonitorData next = iterator.next();
+ if (next.getUserName().equals(monitorID.getUserName())) {
+ // then this is the right place to update
+ List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
+ for (HostMonitorData iHostMonitorID : hostMonitorData) {
+ if (iHostMonitorID.getHost().toXML().equals(monitorID.getHost().toXML())) {
+ List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
+ for (MonitorID iMonitorID : monitorIDs) {
+ if (iMonitorID.getJobID().equals(monitorID.getJobID())
+ || iMonitorID.getJobName().equals(monitorID.getJobName())) {
+ // OK we found the object, we cannot do list.remove(object) states of two objects
+ // could be different, thats why we check the jobID
+ logger.info("Removing the job:" + monitorID.getJobID() + " from monitoring last status:" + monitorID.getStatus().toString());
+ monitorIDs.remove(iMonitorID);
+ if (monitorIDs.size() == 0) {
+ hostMonitorData.remove(iHostMonitorID);
+ if (hostMonitorData.size() == 0) {
+ // no useful data so we have to remove the element from the queue
+ queue.remove(next);
+ }
}
+ return;
}
- return;
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 75bd724..d5dd97b 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -79,7 +79,7 @@ public class GFACSSHUtils {
}
SSHCredential credentials = tokenizedSSHAuthInfo.getCredentials();// this is just a call to get and set credentials in to this object,data will be used
serverInfo.setUserName(credentials.getPortalUserName());
-
+ jobExecutionContext.getExperiment().setUserName(credentials.getPortalUserName());
// inside the pbsCluser object
pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo,
CommonUtils.getPBSJobManager(installedParentPath));
@@ -87,7 +87,7 @@ public class GFACSSHUtils {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
sshSecurityContext.setPbsCluster(pbsCluster);
- jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, sshSecurityContext);
+ jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT, sshSecurityContext);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index e6678d3..6d9f259 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -320,6 +320,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
} catch (KeeperException e) {
e.printStackTrace();
}
+ break;
}
if (watchedEvent.getPath() != null
&& watchedEvent.getPath().startsWith(
http://git-wip-us.apache.org/repos/asf/airavata/blob/9dac178f/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
index b3ecc16..ed4b3b4 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
@@ -145,4 +145,10 @@ public interface Cluster {
*/
public void disconnect() throws SSHApiException;
+ /**
+ * This gives the server Info
+ * @return
+ */
+ public ServerInfo getServerInfo();
+
}