You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/05 23:06:26 UTC
git commit: Fixing the build and more issues related to gfac
Repository: airavata
Updated Branches:
refs/heads/master 39370c526 -> 14c1841ca
Fixing the build and more issues related to gfac
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/14c1841c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/14c1841c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/14c1841c
Branch: refs/heads/master
Commit: 14c1841ca20d08bc1e9da254dbce5cb8a7266b26
Parents: 39370c5
Author: lahiru <la...@apache.org>
Authored: Mon May 5 17:06:10 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon May 5 17:06:10 2014 -0400
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 3 +-
.../airavata/common/utils/ServerSettings.java | 5 ++
.../server/src/main/resources/gfac-config.xml | 6 +-
.../org/apache/airavata/gfac/core/cpi/GFac.java | 7 ---
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 46 +++++++++++++--
.../gfac/core/handler/AbstractHandler.java | 25 ++++++++-
.../core/monitor/AiravataJobStatusUpdator.java | 18 ------
.../gfac/core/provider/AbstractProvider.java | 19 ++++++-
.../gsissh/provider/impl/GSISSHProvider.java | 2 +-
.../gfac/local/provider/impl/LocalProvider.java | 10 ++++
.../gfac/services/impl/LocalProviderTest.java | 7 +++
.../airavata/gfac/monitor/HPCMonitorID.java | 2 +-
.../handlers/GridPullMonitorHandler.java | 29 +---------
.../gfac/monitor/impl/LocalJobMonitor.java | 59 --------------------
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 7 +++
.../apache/airavata/job/AMQPMonitorTest.java | 2 +-
.../core/impl/EmbeddedGFACJobSubmitter.java | 8 ---
.../orchestrator/core/job/JobSubmitter.java | 7 ---
18 files changed, 123 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 89bab76..f207720 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -64,8 +64,9 @@ public class CreateLaunchExperiment {
// addDescriptors();
// final String expId = createExperimentForSSHHost(airavata);
// final String expId = createExperimentForSSHHost(airavata);
- final String expId = createExperimentForTrestles(airavata);
+// final String expId = createExperimentForTrestles(airavata);
// final String expId = createExperimentForStampede(airavata);
+ final String expId = createExperimentForLocalHost(airavata);
System.out.println("Experiment ID : " + expId);
String clonedExpId = cloneExperiment(airavata, expId);
System.out.println("Cloned Experiment ID : " + clonedExpId);
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 1380955..71e1437 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -65,6 +65,8 @@ public class ServerSettings extends ApplicationSettings{
private static final String MY_PROXY_USER = "myproxy.user";
private static final String MY_PROXY_PASSWORD = "myproxy.password";
private static final String MY_PROXY_LIFETIME = "myproxy.life";
+ private static final String ACTIVITY_LISTENERS = "activity.listeners";
+
private static boolean stopAllThreads = false;
public static String getDefaultUser() throws ApplicationSettingsException{
@@ -213,4 +215,7 @@ public class ServerSettings extends ApplicationSettings{
public static int getMyProxyLifetime() throws ApplicationSettingsException {
return Integer.parseInt(getSetting(MY_PROXY_LIFETIME));
}
+ public static String[] getActivityListeners() throws ApplicationSettingsException {
+ return getSetting(ACTIVITY_LISTENERS).split(",");
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/configuration/server/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.xml b/modules/configuration/server/src/main/resources/gfac-config.xml
index 29ef0f8..6d81a59 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.xml
+++ b/modules/configuration/server/src/main/resources/gfac-config.xml
@@ -12,13 +12,11 @@
<GFac>
<DaemonHandlers>
- <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler">
- <property name="listeners" value="org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator"/>
- </Handler>
+ <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler"/>
</DaemonHandlers>
<GlobalHandlers>
<InHandlers>
- <Handler class="org.apache.airavata.gfac.core.handlerAppDescriptorCheckHandler">
+ <Handler class="org.apache.airavata.gfac.core.handler.AppDescriptorCheckHandler">
<property name="name" value="value"/>
</Handler>
</InHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
index c0f7984..60e499f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
@@ -39,11 +39,4 @@ public interface GFac {
*/
public JobExecutionContext submitJob(String experimentID,String taskID) throws GFacException;
- /**
- * This method has to be invoked after submitting the job and have to make sure job is properly finished
- * @param jobExecutionContext
- * @throws GFacException
- */
- public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
-
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index 15b0d72..1d5c6f9 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -28,7 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import com.google.common.eventbus.EventBus;
import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -40,6 +42,8 @@ import org.apache.airavata.gfac.Scheduler;
import org.apache.airavata.gfac.core.context.ApplicationContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.monitor.AbstractActivityListener;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
@@ -81,6 +85,11 @@ public class GFacImpl implements GFac {
private static List<ThreadedHandler> daemonHandlers;
private File gfacConfigFile;
+
+ private List<AbstractActivityListener> activityListeners;
+
+ private static MonitorPublisher monitorPublisher;
+
/**
* Constructor for GFac
*
@@ -93,9 +102,32 @@ public class GFacImpl implements GFac {
this.airavataAPI = airavataAPI;
this.airavataRegistry2 = airavataRegistry2;
daemonHandlers = new ArrayList<ThreadedHandler>();
+ activityListeners = new ArrayList<AbstractActivityListener>();
+ monitorPublisher = new MonitorPublisher(new EventBus()); // This is a EventBus common for gfac
+ startStatusUpdators();
startDaemonHandlers();
}
+ private void startStatusUpdators() {
+ try {
+ String[] listenerClassList = ServerSettings.getActivityListeners();
+ for (String listenerClass : listenerClassList) {
+ Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
+ AbstractActivityListener abstractActivityListener = aClass.newInstance();
+ activityListeners.add(abstractActivityListener);
+ abstractActivityListener.setup(getMonitorPublisher(),registry);
+ getMonitorPublisher().registerListener(abstractActivityListener);
+ }
+ }catch (ClassNotFoundException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ } catch (InstantiationException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ } catch (IllegalAccessException e) {
+ log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ } catch (ApplicationSettingsException e){
+ log.error("Error loading the listener classes configured in airavata-server.properties",e);
+ }
+ }
private void startDaemonHandlers() {
List<GFacHandlerConfig> daemonHandlerConfig = null;
URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
@@ -205,15 +237,15 @@ public class GFacImpl implements GFac {
// start constructing jobexecutioncontext
jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+
+ // setting experiment/task/workflownode related information
Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentID);
jobExecutionContext.setExperiment(experiment);
jobExecutionContext.setExperimentID(experimentID);
-
+ jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
jobExecutionContext.setTaskData(taskData);
-
-
-
+ // setting the registry
jobExecutionContext.setRegistry(registry);
ApplicationContext applicationContext = new ApplicationContext();
@@ -399,5 +431,11 @@ public class GFacImpl implements GFac {
return gfacConfigFile;
}
+ public static MonitorPublisher getMonitorPublisher() {
+ return monitorPublisher;
+ }
+ public Registry getRegistry() {
+ return registry;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index dcbc79d..0049c3c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -21,17 +21,40 @@
package org.apache.airavata.gfac.core.handler;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
public abstract class AbstractHandler implements GFacHandler {
protected Registry registry = null;
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ protected MonitorPublisher publisher = null;
+
+ protected AbstractHandler() {
+ publisher = GFacImpl.getMonitorPublisher(); // This will not be null because this will be initialize in GFacIml
+ }
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
registry = jobExecutionContext.getRegistry();
if(registry == null){
registry = RegistryFactory.getDefaultRegistry();
}
}
+ public MonitorPublisher getPublisher() {
+ return publisher;
+ }
+
+ public void setPublisher(MonitorPublisher publisher) {
+ this.publisher = publisher;
+ }
+
+ public Registry getRegistry() {
+ return registry;
+ }
+
+ public void setRegistry(Registry registry) {
+ this.registry = registry;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index aaf7084..254d7fc 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -43,8 +43,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
private MonitorPublisher monitorPublisher;
- private BlockingQueue<MonitorID> jobsToMonitor;
-
public Registry getAiravataRegistry() {
return airavataRegistry;
@@ -54,13 +52,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
this.airavataRegistry = airavataRegistry;
}
- public BlockingQueue<MonitorID> getJobsToMonitor() {
- return jobsToMonitor;
- }
-
- public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
- this.jobsToMonitor = jobsToMonitor;
- }
@Subscribe
public void updateRegistry(JobStatusChangeRequest jobStatus) {
@@ -77,13 +68,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
- switch (state) {
- case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED:
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- default:
- break;
- }
}
}
@@ -135,8 +119,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
for (Object configuration : configurations) {
if (configuration instanceof Registry){
this.airavataRegistry=(Registry)configuration;
- } else if (configuration instanceof BlockingQueue<?>){
- this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration;
} else if (configuration instanceof MonitorPublisher){
this.monitorPublisher=(MonitorPublisher) configuration;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
index 4d28d72..5b65185 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
@@ -21,8 +21,11 @@
package org.apache.airavata.gfac.core.provider;
+import com.google.common.eventbus.EventBus;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobStatus;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
@@ -38,11 +41,25 @@ public abstract class AbstractProvider implements GFacProvider{
protected JobStatus status; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
protected JobExecutionContext jobExecutionContext;
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ protected MonitorPublisher monitorPublisher;
+
+ protected AbstractProvider() {
+ this.monitorPublisher = GFacImpl.getMonitorPublisher();
+ }
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
log.debug("Initializing " + this.getClass().getName());
registry = RegistryFactory.getDefaultRegistry();
details = new JobDetails();
status = new JobStatus();
this.jobExecutionContext=jobExecutionContext;
}
+
+ public MonitorPublisher getMonitorPublisher() {
+ return monitorPublisher;
+ }
+
+ public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
+ this.monitorPublisher = monitorPublisher;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 9e66b71..880cfbd 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -103,7 +103,7 @@ public class GSISSHProvider extends AbstractProvider {
pullMonitorHandler = threadedHandler;
}
}
- // we know this hos is type GsiSSHHostType
+ // we know this host is type GsiSSHHostType
String monitorMode = ((GsisshHostType) host).getMonitorMode();
if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 5494629..8e78f5a 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -29,6 +29,9 @@ import java.util.Map;
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.monitor.JobIdentity;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
import org.apache.airavata.gfac.core.provider.AbstractProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
@@ -163,6 +166,13 @@ public class LocalProvider extends AbstractProvider {
.append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
.append(String.valueOf(returnValue));
log.info(buf.toString());
+ MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(),jobId,
+ jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getExperiment().getUserName());
+ JobStatusChangeRequest jobStatusChangeRequest = new JobStatusChangeRequest(monitorID);
+ jobStatusChangeRequest.setState(JobState.COMPLETE);
+ this.getMonitorPublisher().publish(jobStatusChangeRequest);
} catch (IOException io) {
throw new GFacProviderException(io.getMessage(), io);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index df628e4..a6f689f 100644
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -20,16 +20,20 @@
*/
package org.apache.airavata.core.gfac.services.impl;
+import com.google.common.eventbus.EventBus;
import org.apache.airavata.commons.gfac.type.*;
import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.ApplicationContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler;
import org.apache.airavata.gfac.local.provider.impl.LocalProvider;
+import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.InputParameterType;
@@ -141,8 +145,10 @@ public class LocalProviderTest {
jobExecutionContext.setOutMessageContext(outMessage);
jobExecutionContext.setExperimentID("test123");
+ jobExecutionContext.setExperiment(new Experiment("test123","project1","admin","testExp"));
jobExecutionContext.setTaskData(new TaskDetails(jobExecutionContext.getExperimentID()));
jobExecutionContext.setRegistry(new LoggingRegistryImpl());
+ jobExecutionContext.setWorkflowNodeDetails(new WorkflowNodeDetails(jobExecutionContext.getExperimentID(),"none"));
}
@@ -165,6 +171,7 @@ public class LocalProviderTest {
LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
localDirectorySetupHandler.invoke(jobExecutionContext);
LocalProvider localProvider = new LocalProvider();
+ localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus()));
localProvider.initialize(jobExecutionContext);
localProvider.execute(jobExecutionContext);
localProvider.dispose(jobExecutionContext);
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
index da4ff88..942f6ae 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
@@ -80,7 +80,7 @@ public class HPCMonitorID extends MonitorID {
jobID = jobExecutionContext.getJobDetails().getJobID();
taskID = jobExecutionContext.getTaskData().getTaskID();
experimentID = jobExecutionContext.getExperiment().getExperimentID();
- workflowNodeID = jobExecutionContext.getExperiment().getWorkflowNodeDetailsList().get(0).getNodeInstanceId();// at this point we only have one node todo: fix this
+ workflowNodeID = jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId();// at this point we only have one node todo: fix this
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index a87b8a5..a9d2e73 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.monitor.handlers;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.monitor.AbstractActivityListener;
@@ -53,11 +54,7 @@ public class GridPullMonitorHandler extends ThreadedHandler {
private AuthenticationInfo authenticationInfo;
- private List<AbstractActivityListener> activityListeners;
-
- boolean registrySet = false;
public void initProperties(Map<String, String> properties) throws GFacHandlerException {
- activityListeners = new ArrayList<AbstractActivityListener>();
String myProxyUser = null;
try {
myProxyUser = ServerSettings.getSetting("myproxy.username");
@@ -66,24 +63,9 @@ public class GridPullMonitorHandler extends ThreadedHandler {
String myProxyServer = ServerSettings.getSetting("myproxy.server");
setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
7512, 17280000, certPath));
- hpcPullMonitor = new HPCPullMonitor();
- String listeners = properties.get("listeners");
- String[] split = listeners.split(",");
- for (String listenerClass : split) {
- Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
- AbstractActivityListener abstractActivityListener = aClass.newInstance();
- activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(hpcPullMonitor.getQueue(), hpcPullMonitor.getPublisher());
- hpcPullMonitor.getPublisher().registerListener(abstractActivityListener);
- }
+ hpcPullMonitor = new HPCPullMonitor(GFacImpl.getMonitorPublisher());
} catch (ApplicationSettingsException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (ClassNotFoundException e) {
- logger.error("Error loading the listener classes configured in gfac-config.xml");
- } catch (InstantiationException e) {
- logger.error("Error loading the listener classes configured in gfac-config.xml");
- } catch (IllegalAccessException e) {
- logger.error("Error loading the listener classes configured in gfac-config.xml");
}
}
@@ -92,13 +74,8 @@ public class GridPullMonitorHandler extends ThreadedHandler {
}
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- if(!registrySet){
- for(AbstractActivityListener listener:activityListeners){
- listener.setup(jobExecutionContext.getRegistry());
- }
- }
super.invoke(jobExecutionContext);
- MonitorID monitorID = new HPCMonitorID(authenticationInfo, jobExecutionContext);
+ MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
try {
CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
} catch (AiravataMonitorException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
deleted file mode 100644
index 3c87d7d..0000000
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl;
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.JobIdentity;
-import org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor;
-import org.apache.airavata.model.workspace.experiment.JobState;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
- * This monitor can be used to monitor a job which runs locally,
- * Since its a local job job doesn't have states, once it get executed
- * then the job starts running
- */
-public class LocalJobMonitor extends AiravataAbstractMonitor {
- // Though we have a qeuue here, it not going to be used in local jobs
- BlockingQueue<MonitorID> jobQueue;
-
- public void run() {
- do {
- try {
- MonitorID take = jobQueue.take();
- getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE));
- } catch (Exception e) {
- e.printStackTrace();
- }
- } while (!ServerSettings.isStopAllThreads());
- }
-
- public BlockingQueue<MonitorID> getJobQueue() {
- return jobQueue;
- }
-
- public void setJobQueue(BlockingQueue<MonitorID> jobQueue) {
- this.jobQueue = jobQueue;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 38453e8..38b631c 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -63,6 +63,13 @@ public class HPCPullMonitor extends PullMonitor {
this.queue = new LinkedBlockingDeque<UserMonitorData>();
publisher = new MonitorPublisher(new EventBus());
}
+
+ public HPCPullMonitor(MonitorPublisher monitorPublisher){
+ connections = new HashMap<String, ResourceConnection>();
+ this.queue = new LinkedBlockingDeque<UserMonitorData>();
+ publisher = monitorPublisher;
+ }
+
public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
this.queue = queue;
this.publisher = publisher;
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 2c34df9..9c3d08b 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -62,7 +62,7 @@ public class AMQPMonitorTest {
@Before
public void setUp() throws Exception {
System.setProperty("myproxy.username", "ogce");
- System.setProperty("myproxy.password", "OpenGwy14");
+ System.setProperty("myproxy.password", "");
System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
System.setProperty("gsi.working.directory", "/home1/01437/ogce");
System.setProperty("trusted.cert.location", "/Users/lahirugunathilake/Downloads/certificates");
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
index cbc564d..90b07a2 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
@@ -82,12 +82,4 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
this.orchestratorContext = orchestratorContext;
}
-
- public void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException {
- try {
- gfac.invokeOutFlowHandlers(jobExecutionContext);
- } catch (GFacException e) {
- throw new OrchestratorException(e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index b1787a3..1fd5269 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -52,11 +52,4 @@ public interface JobSubmitter {
* @return JobExecutionContext return the jobExecutionContext from GFac
*/
JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException;
-
- /**
- * This can be use to handle any after Jobsubmission task
- * @param jobExecutionContext
- * @throws OrchestratorException
- */
- void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException;
}