You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/29 19:43:16 UTC
[1/2] airavata git commit: Make BetterGfacImpl singleton
Repository: airavata
Updated Branches:
refs/heads/master 7ebe11887 -> 6a5ed6e96
Make BetterGfacImpl singleton
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/957fd5fc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/957fd5fc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/957fd5fc
Branch: refs/heads/master
Commit: 957fd5fc535f1f8b5a48cc3a888c2da174e4e5d8
Parents: 4a06178
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu May 28 20:23:32 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu May 28 20:26:09 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/server/GfacServerHandler.java | 91 +++++++---
.../gfac/bes/provider/impl/BESProvider.java | 6 +-
.../gfac/core/context/JobExecutionContext.java | 11 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 181 +++++++------------
.../org/apache/airavata/gfac/core/cpi/GFac.java | 14 ++
.../gfac/core/handler/AbstractHandler.java | 12 +-
.../gfac/core/provider/AbstractProvider.java | 13 --
.../airavata/gfac/core/utils/GFacUtils.java | 4 +-
.../gfac/core/utils/OutHandlerWorker.java | 4 +-
.../gsissh/provider/impl/GSISSHProvider.java | 20 +-
.../gfac/local/provider/impl/LocalProvider.java | 8 +-
.../gfac/monitor/email/EmailBasedMonitor.java | 4 +-
.../monitor/core/AiravataAbstractMonitor.java | 8 -
.../handlers/GridPullMonitorHandler.java | 7 +-
.../handlers/GridPushMonitorHandler.java | 2 +-
.../gfac/ssh/provider/impl/SSHProvider.java | 43 ++---
.../core/impl/GFACEmbeddedJobSubmitter.java | 10 +-
17 files changed, 206 insertions(+), 232 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 20926d7..961e46d 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -32,9 +32,14 @@ import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
@@ -43,6 +48,8 @@ import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.PublisherFactory;
import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
@@ -62,16 +69,19 @@ import org.apache.thrift.TException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
+import org.xml.sax.SAXException;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.locks.Lock;
-
public class GfacServerHandler implements GfacService.Iface {
private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
@@ -81,15 +91,15 @@ public class GfacServerHandler implements GfacService.Iface {
private AppCatalog appCatalog;
private String gatewayName;
private String airavataUserName;
-// private ZooKeeper zk;
private CuratorFramework curatorClient;
- private static Integer mutex = -1;
- private static Lock lock;
private MonitorPublisher publisher;
private String gfacServer;
private String gfacExperiments;
private String airavataServerHostPort;
private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
+ private static File gfacConfigFile;
+ private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
+ private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
public GfacServerHandler() throws Exception {
try {
@@ -104,17 +114,18 @@ public class GfacServerHandler implements GfacService.Iface {
+ ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
storeServerConfig();
publisher = new MonitorPublisher(new EventBus());
- BetterGfacImpl.setMonitorPublisher(publisher);
registry = RegistryFactory.getDefaultRegistry();
appCatalog = AppCatalogFactory.getAppCatalog();
setGatewayProperties();
- BetterGfacImpl.startDaemonHandlers();
-
+ startDaemonHandlers();
+ // initializing Better Gfac Instance
+ BetterGfacImpl.getInstance().init(registry, appCatalog, curatorClient, publisher);
if (ServerSettings.isGFacPassiveMode()) {
rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
}
- BetterGfacImpl.startStatusUpdators(registry, curatorClient, publisher, rabbitMQTaskLaunchConsumer);
+ startStatusUpdators(registry, curatorClient, publisher, rabbitMQTaskLaunchConsumer);
+
} catch (Exception e) {
throw new Exception("Error initialising GFAC", e);
}
@@ -187,8 +198,8 @@ public class GfacServerHandler implements GfacService.Iface {
requestCount++;
logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------");
logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId);
- GFac gfac = getGfac();
- InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId, tokenId);
+ InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(BetterGfacImpl.getInstance(), experimentId,
+ taskId, gatewayId, tokenId);
// try {
// if( gfac.submitJob(experimentId, taskId, gatewayId)){
logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " +
@@ -202,9 +213,8 @@ public class GfacServerHandler implements GfacService.Iface {
public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId);
- GFac gfac = getGfac();
try {
- if (gfac.cancel(experimentId, taskId, gatewayId, tokenId)) {
+ if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, gatewayId, tokenId)) {
logger.debugId(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId);
return true;
} else {
@@ -247,20 +257,55 @@ public class GfacServerHandler implements GfacService.Iface {
}
private GFac getGfac() throws TException {
- try {
- return new BetterGfacImpl(registry, appCatalog, curatorClient, publisher);
+ GFac gFac = BetterGfacImpl.getInstance();
+ gFac.init(registry, appCatalog, curatorClient, publisher);
+ return gFac;
+ }
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- logger.error(e.getMessage(), e);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
+ public void startDaemonHandlers() {
+ List<GFacHandlerConfig> daemonHandlerConfig = null;
+ String className = null;
+ try {
+ URL resource = GfacServerHandler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ if (resource != null) {
+ gfacConfigFile = new File(resource.getPath());
+ }
+ daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
+ for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) {
+ className = handlerConfig.getClassName();
+ Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
+ ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
+ threadedHandler.initProperties(handlerConfig.getProperties());
+ daemonHandlers.add(threadedHandler);
+ }
+ } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
+ InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
+ logger.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+ }
+ for (ThreadedHandler tHandler : daemonHandlers) {
+ (new Thread(tHandler)).start();
}
- return null;
-
}
+
+ public static void startStatusUpdators(Registry registry, CuratorFramework curatorClient, MonitorPublisher publisher,
+
+ RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
+ try {
+ String[] listenerClassList = ServerSettings.getActivityListeners();
+ Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
+ for (String listenerClass : listenerClassList) {
+ Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
+ AbstractActivityListener abstractActivityListener = aClass.newInstance();
+ activityListeners.add(abstractActivityListener);
+ abstractActivityListener.setup(publisher, registry, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
+ logger.info("Registering listener: " + listenerClass);
+ publisher.registerListener(abstractActivityListener);
+ }
+ } catch (Exception e) {
+ logger.error("Error loading the listener classes configured in airavata-server.properties", e);
+ }
+ }
private static class TestHandler implements MessageHandler{
@Override
public Map<String, Object> getProperties() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index dad2a4d..bd98835 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -150,7 +150,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
log.info(String.format("Activity Submitting to %s ... \n",
factoryUrl));
- monitorPublisher.publish(new StartExecutionEvent());
+ jobExecutionContext.getMonitorPublisher().publish(new StartExecutionEvent());
CreateActivityResponseDocument response = factory.createActivity(cad);
log.info(String.format("Activity Submitted to %s \n", factoryUrl));
@@ -169,7 +169,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
jobDetails.setJobDescription(activityEpr.toString());
jobExecutionContext.setJobDetails(jobDetails);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
log.info(formatStatusMessage(activityEpr.getAddress()
.getStringValue(), factory.getActivityStatus(activityEpr)
.toString()));
@@ -458,6 +458,6 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
log.debug(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
"experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
jobStatus.getJobIdentity().getTaskId());
- monitorPublisher.publish(jobStatus);
+ jobExecutionContext.getMonitorPublisher().publish(jobStatus);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 0ca3828..1baf792 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -30,6 +30,7 @@ import java.util.Map;
import org.airavata.appcatalog.cpi.AppCatalog;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.SecurityContext;
@@ -43,7 +44,6 @@ import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,6 +146,7 @@ public class JobExecutionContext extends AbstractContext implements Serializable
private String status;
private List<String> outputFileList;
private Registry registry;
+ private MonitorPublisher monitorPublisher;
public String getGatewayID() {
return gatewayID;
@@ -495,4 +496,12 @@ public class JobExecutionContext extends AbstractContext implements Serializable
public void setLoginUserName(String loginUserName) {
this.loginUserName = loginUserName;
}
+
+ public MonitorPublisher getMonitorPublisher() {
+ return monitorPublisher;
+ }
+
+ public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
+ this.monitorPublisher = monitorPublisher;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 200ffbe..2cc375e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -23,12 +23,9 @@ package org.apache.airavata.gfac.core.cpi;
import org.airavata.appcatalog.cpi.AppCatalog;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
@@ -36,7 +33,9 @@ import org.apache.airavata.gfac.Scheduler;
import org.apache.airavata.gfac.core.context.ApplicationContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.*;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
@@ -46,37 +45,50 @@ import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.gfac.core.states.GfacHandlerState;
import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.PublisherFactory;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.appinterface.DataType;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
+import org.apache.airavata.model.appcatalog.computeresource.FileSystems;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.*;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.xpath.XPathExpressionException;
import java.io.File;
-import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URL;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
/**
* This is the GFac CPI class for external usage, this simply have a single method to submit a job to
@@ -84,82 +96,35 @@ import java.util.*;
*/
public class BetterGfacImpl implements GFac {
private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
- public static final String ERROR_SENT = "ErrorSent";
+ private static String ERROR_SENT = "ErrorSent";
private Registry registry;
private CuratorFramework curatorClient;
- private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
- private static File gfacConfigFile;
- private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
- private static MonitorPublisher monitorPublisher;
+ private MonitorPublisher monitorPublisher;
+ private static GFac gfacInstance;
+ private boolean initialized = false;
- /**
- * Constructor for GFac
- *
- * @param registry
- * @param curatorClient
- */
- public BetterGfacImpl(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient,
- MonitorPublisher publisher) throws ApplicationSettingsException, IOException, InterruptedException {
- this.registry = registry;
- monitorPublisher = publisher; // This is a EventBus common for gfac
- this.curatorClient = curatorClient;
- }
-
- public static void startStatusUpdators(Registry registry, CuratorFramework curatorClient, MonitorPublisher publisher,
+ private BetterGfacImpl() {
- RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
- try {
- String[] listenerClassList = ServerSettings.getActivityListeners();
- Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
- for (String listenerClass : listenerClassList) {
- Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
- AbstractActivityListener abstractActivityListener = aClass.newInstance();
- activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(publisher, registry, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
- log.info("Registering listener: " + listenerClass);
- publisher.registerListener(abstractActivityListener);
- }
- } catch (Exception e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- }
}
- public static void startDaemonHandlers() {
- List<GFacHandlerConfig> daemonHandlerConfig = null;
- String className = null;
- try {
- URL resource = BetterGfacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
- if (resource != null) {
- gfacConfigFile = new File(resource.getPath());
- }
- daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
- for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) {
- className = handlerConfig.getClassName();
- Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
- ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
- threadedHandler.initProperties(handlerConfig.getProperties());
- daemonHandlers.add(threadedHandler);
+ public static GFac getInstance() {
+ if (gfacInstance == null) {
+ synchronized (BetterGfacImpl.class) {
+ if (gfacInstance == null) {
+ gfacInstance = new BetterGfacImpl();
+ }
}
- } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
- InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- }
- for (ThreadedHandler tHandler : daemonHandlers) {
- (new Thread(tHandler)).start();
}
+ return gfacInstance;
}
- /**
- * This can be used to submit jobs for testing purposes just by filling parameters by hand (JobExecutionContext)
- */
- public BetterGfacImpl() {
- daemonHandlers = new ArrayList<ThreadedHandler>();
- startDaemonHandlers();
- }
-
- public BetterGfacImpl(Registry registry) {
- this();
+ @Override
+ public boolean init(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient,
+ MonitorPublisher publisher) {
this.registry = registry;
+ monitorPublisher = publisher; // This is a EventBus common for gfac
+ this.curatorClient = curatorClient;
+ return initialized = true;
}
@@ -171,7 +136,11 @@ public class BetterGfacImpl implements GFac {
* @return
* @throws GFacException
*/
+ @Override
public boolean submitJob(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException {
+ if (!initialized) {
+ throw new GFacException("Initialize the Gfac instance before use it");
+ }
JobExecutionContext jobExecutionContext = null;
try {
jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
@@ -283,8 +252,9 @@ public class BetterGfacImpl implements GFac {
jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs)));
jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
- jobExecutionContext.setGfac(this);
+ jobExecutionContext.setGfac(gfacInstance);
jobExecutionContext.setCuratorClient(curatorClient);
+ jobExecutionContext.setMonitorPublisher(monitorPublisher);
// handle job submission protocol
List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
@@ -502,7 +472,11 @@ public class BetterGfacImpl implements GFac {
}
}
+ @Override
public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException {
+ if (!initialized) {
+ throw new GFacException("Initialize the Gfac instance before use it");
+ }
JobExecutionContext jobExecutionContext = null;
try {
jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
@@ -886,7 +860,11 @@ public class BetterGfacImpl implements GFac {
}
}
+ @Override
public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ if (!initialized) {
+ throw new GFacException("Initialize the Gfac instance before use it");
+ }
String experimentPath = null;
try {
experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
@@ -1026,7 +1004,11 @@ public class BetterGfacImpl implements GFac {
}
// TODO - Did refactoring, but need to recheck the logic again.
+ @Override
public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ if (!initialized) {
+ throw new GFacException("Initialize the Gfac instance before use it");
+ }
GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
List<GFacHandlerConfig> handlers = null;
if (gFacConfiguration != null) {
@@ -1118,32 +1100,7 @@ public class BetterGfacImpl implements GFac {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
}
-
- public static void setMonitorPublisher(MonitorPublisher monitorPublisher) {
- BetterGfacImpl.monitorPublisher = monitorPublisher;
- }
-
- public static List<ThreadedHandler> getDaemonHandlers() {
- return daemonHandlers;
- }
-
- public static String getErrorSent() {
- return ERROR_SENT;
- }
-
- public File getGfacConfigFile() {
- return gfacConfigFile;
- }
-
- public static MonitorPublisher getMonitorPublisher() {
- return monitorPublisher;
- }
-
- public Registry getRegistry() {
- return registry;
- }
-
- public boolean isCancelled(JobExecutionContext executionContext) {
+ private boolean isCancelled(JobExecutionContext executionContext) {
// we should check whether experiment is cancelled using registry
try {
ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
@@ -1161,7 +1118,7 @@ public class BetterGfacImpl implements GFac {
return false;
}
- public boolean isCancelling(JobExecutionContext executionContext) {
+ private boolean isCancelling(JobExecutionContext executionContext) {
// check whether cancelling request came
try {
ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
@@ -1179,7 +1136,7 @@ public class BetterGfacImpl implements GFac {
return false;
}
- public boolean isCancel(JobExecutionContext jobExecutionContext) {
+ private boolean isCancel(JobExecutionContext jobExecutionContext) {
try {
ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, jobExecutionContext.getExperimentID());
if (status != null) {
@@ -1196,4 +1153,6 @@ public class BetterGfacImpl implements GFac {
return false;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
index 5cf9d00..962f0ec 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
@@ -20,8 +20,12 @@
*/
package org.apache.airavata.gfac.core.cpi;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.curator.framework.CuratorFramework;
/**
* This is the GFac CPI interface which needs to be implemented by an internal class, this simply have a single method to submit a job to
@@ -30,6 +34,16 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
public interface GFac {
/**
+ * Initialized method, this method must call one time before use any other method.
+ * @param registry
+ * @param appCatalog
+ * @param curatorClient
+ * @param publisher
+ * @return
+ */
+ public boolean init(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient, MonitorPublisher publisher);
+
+ /**
* This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
* And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
*
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index eef0a33..ecf826d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -42,12 +42,9 @@ public abstract class AbstractHandler implements GFacHandler {
protected MonitorPublisher publisher = null;
- protected AbstractHandler() {
- publisher = BetterGfacImpl.getMonitorPublisher(); // This will not be null because this will be initialize in GFacIml
- }
-
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
try {
+ publisher = jobExecutionContext.getMonitorPublisher();
GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
} catch (Exception e) {
logger.error("Error saving Recoverable provider state", e);
@@ -61,13 +58,6 @@ public abstract class AbstractHandler implements GFacHandler {
}
}
}
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
public Registry getRegistry() {
return registry;
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
index b650482..dc4582a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
@@ -41,12 +41,6 @@ public abstract class AbstractProvider implements GFacProvider{
protected JobStatus status; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
protected JobExecutionContext jobExecutionContext;
- protected MonitorPublisher monitorPublisher;
-
- protected AbstractProvider() { //todo this has to be fixed
- this.monitorPublisher = BetterGfacImpl.getMonitorPublisher();
- }
-
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
log.debug("Initializing " + this.getClass().getName());
if(jobExecutionContext.getRegistry() == null) {
@@ -63,11 +57,4 @@ public abstract class AbstractProvider implements GFacProvider{
this.jobExecutionContext=jobExecutionContext;
}
- public MonitorPublisher getMonitorPublisher() {
- return monitorPublisher;
- }
-
- public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
- this.monitorPublisher = monitorPublisher;
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index f41f29c..09724d5 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -222,7 +222,7 @@ public class GFacUtils {
}
public static void saveJobStatus(JobExecutionContext jobExecutionContext,
- JobDetails details, JobState state, MonitorPublisher monitorPublisher) throws GFacException {
+ JobDetails details, JobState state) throws GFacException {
try {
// first we save job details to the registry for sa and then save the job status.
Registry registry = jobExecutionContext.getRegistry();
@@ -236,7 +236,7 @@ public class GFacUtils {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
- monitorPublisher.publish(jobStatusChangeRequestEvent);
+ jobExecutionContext.getMonitorPublisher().publish(jobStatusChangeRequestEvent);
} catch (Exception e) {
throw new GFacException("Error persisting job status"
+ e.getLocalizedMessage(), e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
index f992f10..f027ccb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@ -53,10 +53,10 @@ public class OutHandlerWorker implements Runnable {
this.jEC = monitorID.getJobExecutionContext();
}
- public OutHandlerWorker(JobExecutionContext jEC, MonitorPublisher monitorPublisher) {
+ public OutHandlerWorker(JobExecutionContext jEC) {
this.jEC = jEC;
this.gfac = jEC.getGfac();
- this.monitorPublisher = monitorPublisher;
+ this.monitorPublisher = jEC.getMonitorPublisher();
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 540eaac..36aac4c 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -113,10 +113,10 @@ public class GSISSHProvider extends AbstractProvider {
jobExecutionContext.setJobDetails(jobDetails);
if (jobID == null) {
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
} else {
jobDetails.setJobID(jobID.split("\\.")[0]);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
}
data.append(",jobId=").append(jobDetails.getJobID());
@@ -128,7 +128,7 @@ public class GSISSHProvider extends AbstractProvider {
String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
@@ -142,7 +142,7 @@ public class GSISSHProvider extends AbstractProvider {
}
public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
- List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+/* List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
if (daemonHandlers == null) {
daemonHandlers = BetterGfacImpl.getDaemonHandlers();
}
@@ -173,7 +173,7 @@ public class GSISSHProvider extends AbstractProvider {
if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
", execution is configured as asynchronous, so Outhandler will not be invoked");
- }
+ }*/
}
public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
@@ -208,21 +208,21 @@ public class GSISSHProvider extends AbstractProvider {
log.error("No Job Id is set, so cannot perform the cancel operation !!!");
return false;
}
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
return true;
// we know this host is type GsiSSHHostType
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} catch (Exception e) {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
}
@@ -310,7 +310,7 @@ public class GSISSHProvider extends AbstractProvider {
return;
}
}
-
+/*
// if email monitor is not activeated or not configure we use pull or push monitor
List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
if (daemonHandlers == null) {
@@ -346,6 +346,6 @@ public class GSISSHProvider extends AbstractProvider {
log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
", execution is configured as asynchronous, so Outhandler will not be invoked");
- }
+ }*/
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index ef66171..871193a 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -136,8 +136,8 @@ public class LocalProvider extends AbstractProvider {
jobDetails.setJobDescription(jobExecutionContext.getApplicationContext()
.getApplicationDeploymentDescription().getAppDeploymentDescription());
jobExecutionContext.setJobDetails(jobDetails);
- GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP, monitorPublisher);
- // running cmd
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SETUP);
+ // running cmd
Process process = builder.start();
Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput());
@@ -180,7 +180,7 @@ public class LocalProvider extends AbstractProvider {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- this.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
+ jobExecutionContext.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
} catch (IOException io) {
throw new GFacProviderException(io.getMessage(), io);
} catch (InterruptedException e) {
@@ -236,7 +236,7 @@ public class LocalProvider extends AbstractProvider {
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
+ jobExecutionContext.getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
} catch (XmlException e) {
throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
} catch (IOException io) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 535a8b3..07934a4 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -294,7 +294,7 @@ public class EmailBasedMonitor implements Runnable{
if (runOutHandlers) {
log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(jEC, BetterGfacImpl.getMonitorPublisher()));
+ GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(jEC));
}
}
@@ -313,7 +313,7 @@ public class EmailBasedMonitor implements Runnable{
"experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
jobStatus.getJobIdentity().getTaskId());
- BetterGfacImpl.getMonitorPublisher().publish(jobStatus);
+ jobExecutionContext.getMonitorPublisher().publish(jobStatus);
}
private void writeEnvelopeOnError(Message m) throws MessagingException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
index c754d3c..b4ac3a9 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -34,13 +34,5 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AiravataAbstractMonitor implements Monitor {
private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
- protected MonitorPublisher publisher;
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 2476c85..10192b7 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -67,11 +67,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
String myProxyServer = ServerSettings.getSetting("myproxy.server");
setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
7512, 17280000, certPath));
- if(BetterGfacImpl.getMonitorPublisher() != null){
- hpcPullMonitor = new HPCPullMonitor(BetterGfacImpl.getMonitorPublisher(),getAuthenticationInfo()); // we use our own credentials for monitoring, not from the store
- }else {
- throw new GFacHandlerException("Error initializing Monitor Handler, because Monitor Publisher is null !!!");
- }
+ hpcPullMonitor = new HPCPullMonitor(null,getAuthenticationInfo()); // we use our own credentials for monitoring, not from the store
} catch (ApplicationSettingsException e) {
logger.error("Error while reading server properties", e);
throw new GFacHandlerException("Error while reading server properties", e);
@@ -85,6 +81,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
super.invoke(jobExecutionContext);
hpcPullMonitor.setGfac(jobExecutionContext.getGfac());
+ hpcPullMonitor.setPublisher(jobExecutionContext.getMonitorPublisher());
MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
try {
/* ZooKeeper zk = jobExecutionContext.getZk();
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
index bb36b28..8b445df 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
@@ -67,7 +67,7 @@ public class GridPushMonitorHandler extends ThreadedHandler {
LinkedBlockingQueue<MonitorID> pushQueue = new LinkedBlockingQueue<MonitorID>();
LinkedBlockingQueue<MonitorID> finishQueue = new LinkedBlockingQueue<MonitorID>();
List<String> hosts= Arrays.asList(hostList.split(","));
- amqpMonitor=new AMQPMonitor(BetterGfacImpl.getMonitorPublisher(),pushQueue,finishQueue,proxyFilePath,connectionName,hosts);
+ amqpMonitor=new AMQPMonitor(null,pushQueue,finishQueue,proxyFilePath,connectionName,hosts);
}catch (ApplicationSettingsException e){
logger.error(e.getMessage(), e);
throw new GFacHandlerException(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index f967dbf..cc6cca0 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -24,8 +24,10 @@ package org.apache.airavata.gfac.ssh.provider.impl;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.ExecutionMode;
+import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
@@ -70,6 +72,7 @@ import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.*;
+import java.net.URL;
import java.util.*;
/**
@@ -107,7 +110,7 @@ public class SSHProvider extends AbstractProvider {
JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, cluster);
details.setJobDescription(jobDescriptor.toXML());
- GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
log.info(remoteFile);
File runscript = createShellScript(jobExecutionContext);
cluster.scpTo(remoteFile, runscript.getAbsolutePath());
@@ -146,6 +149,7 @@ public class SSHProvider extends AbstractProvider {
jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
JobDetails jobDetails = new JobDetails();
String hostAddress = jobExecutionContext.getHostName();
+ MonitorPublisher monitorPublisher = jobExecutionContext.getMonitorPublisher();
try {
Cluster cluster = null;
if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
@@ -165,14 +169,14 @@ public class SSHProvider extends AbstractProvider {
String jobID = cluster.submitBatchJob(jobDescriptor);
if (jobID != null && !jobID.isEmpty()) {
jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED, monitorPublisher);
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
- , GfacExperimentState.JOBSUBMITTED));
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.JOBSUBMITTED));
jobExecutionContext.setJobDetails(jobDetails);
if (verifyJobSubmissionByJobId(cluster, jobID)) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.JOBSUBMITTED));
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
}
} else {
jobExecutionContext.setJobDetails(jobDetails);
@@ -183,7 +187,7 @@ public class SSHProvider extends AbstractProvider {
jobDetails.setJobID(jobID);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.JOBSUBMITTED));
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
}
}
@@ -199,14 +203,14 @@ public class SSHProvider extends AbstractProvider {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} catch (Exception e) {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} finally {
@@ -262,7 +266,7 @@ public class SSHProvider extends AbstractProvider {
if (jobDetails.getJobID() != null) {
if (cluster.cancelJob(jobDetails.getJobID()) != null) {
// if this operation success without any exceptions, we can assume cancel operation succeeded.
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED, monitorPublisher);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
return true;
} else {
log.info("Job Cancel operation failed");
@@ -460,25 +464,10 @@ public class SSHProvider extends AbstractProvider {
}
return;
}
+ } else {
+ throw new IllegalArgumentException("Monitoring is implemented only for SSH, "
+ + jobExecutionContext.getPreferredJobSubmissionProtocol().name() + " is not yet implemented");
}
- // if email monitor is not activeated or not configure we use pull or push monitor
- List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- pullMonitorHandler.invoke(jobExecutionContext);
- }
- // have to handle the GridPushMonitorHandler logic
- }
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
- }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/957fd5fc/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
index d6fe40f..e93ae71 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
@@ -54,15 +54,7 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter {
public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
this.orchestratorContext = orchestratorContext;
- try {
- gfac = new BetterGfacImpl(orchestratorContext.getNewRegistry(),null, null, new MonitorPublisher(new EventBus()));
- } catch (ApplicationSettingsException e) {
- logger.error(e.getMessage(), e);
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- }
+ gfac = BetterGfacImpl.getInstance();
}
public GFACInstance selectGFACInstance() throws OrchestratorException {
[2/2] airavata git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6a5ed6e9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6a5ed6e9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6a5ed6e9
Branch: refs/heads/master
Commit: 6a5ed6e96366cac6297afa6861218b69dca6afb8
Parents: 957fd5f 7ebe118
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Fri May 29 13:42:58 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Fri May 29 13:42:58 2015 -0400
----------------------------------------------------------------------
airavata-api/airavata-api-server/pom.xml | 2 +-
airavata-api/airavata-api-stubs/pom.xml | 2 +-
airavata-api/airavata-client-sdks/airavata-cpp-sdk/pom.xml | 2 +-
airavata-api/airavata-client-sdks/airavata-php-sdk/pom.xml | 2 +-
.../airavata-client-sdks/airavata-python-sdk/pom.xml | 2 +-
.../airavata-client-sdks/java-client-samples/pom.xml | 2 +-
airavata-api/airavata-client-sdks/pom.xml | 2 +-
airavata-api/airavata-data-models/pom.xml | 2 +-
airavata-api/airavata-model-utils/pom.xml | 2 +-
airavata-api/pom.xml | 2 +-
modules/app-catalog/app-catalog-cpi/pom.xml | 2 +-
modules/app-catalog/app-catalog-data/pom.xml | 2 +-
modules/app-catalog/pom.xml | 2 +-
modules/commons/pom.xml | 2 +-
modules/commons/utils/pom.xml | 2 +-
modules/configuration/client/pom.xml | 2 +-
modules/configuration/pom.xml | 2 +-
modules/configuration/server/pom.xml | 2 +-
modules/credential-store/credential-store-service/pom.xml | 5 ++---
modules/credential-store/credential-store-stubs/pom.xml | 2 +-
modules/credential-store/credential-store-webapp/pom.xml | 2 +-
modules/credential-store/pom.xml | 2 +-
modules/distribution/api-server/pom.xml | 2 +-
modules/distribution/client/java/pom.xml | 5 ++---
modules/distribution/client/pom.xml | 2 +-
modules/distribution/gfac-server/pom.xml | 2 +-
modules/distribution/new-dist/pom.xml | 5 ++---
modules/distribution/orchestrator-server/pom.xml | 2 +-
modules/distribution/pom.xml | 2 +-
modules/distribution/release/pom.xml | 2 +-
modules/distribution/server/pom.xml | 5 ++---
modules/distribution/xbaya-gui/pom.xml | 2 +-
modules/gfac/airavata-gfac-service/pom.xml | 2 +-
modules/gfac/airavata-gfac-stubs/pom.xml | 2 +-
modules/gfac/gfac-application-specific-handlers/pom.xml | 6 ++----
modules/gfac/gfac-bes/pom.xml | 5 ++---
modules/gfac/gfac-core/pom.xml | 2 +-
modules/gfac/gfac-gsissh/pom.xml | 2 +-
modules/gfac/gfac-local/pom.xml | 2 +-
modules/gfac/gfac-monitor/gfac-email-monitor/pom.xml | 6 ++----
modules/gfac/gfac-monitor/gfac-hpc-monitor/pom.xml | 6 ++----
modules/gfac/gfac-monitor/pom.xml | 2 +-
modules/gfac/gfac-ssh/pom.xml | 2 +-
modules/gfac/pom.xml | 2 +-
modules/messaging/client/pom.xml | 2 +-
modules/messaging/core/pom.xml | 2 +-
modules/messaging/pom.xml | 2 +-
modules/orchestrator/airavata-orchestrator-service/pom.xml | 2 +-
modules/orchestrator/airavata-orchestrator-stubs/pom.xml | 2 +-
modules/orchestrator/orchestrator-core/pom.xml | 2 +-
modules/orchestrator/pom.xml | 2 +-
modules/registry/airavata-jpa-registry/pom.xml | 2 +-
modules/registry/pom.xml | 2 +-
modules/registry/registry-cpi/pom.xml | 2 +-
modules/security/pom.xml | 2 +-
modules/server/pom.xml | 2 +-
modules/test-suite/multi-tenanted-airavata/pom.xml | 5 ++---
modules/test-suite/pom.xml | 2 +-
modules/workflow-model/pom.xml | 2 +-
modules/workflow-model/workflow-engine/pom.xml | 2 +-
modules/workflow-model/workflow-model-component/pom.xml | 2 +-
modules/workflow-model/workflow-model-core/pom.xml | 2 +-
modules/workflow/pom.xml | 8 +++-----
modules/workflow/workflow-core/pom.xml | 6 ++----
modules/xbaya-gui/pom.xml | 2 +-
pom.xml | 2 +-
tools/gsissh/pom.xml | 2 +-
tools/pom.xml | 2 +-
tools/registry-tool/pom.xml | 2 +-
69 files changed, 81 insertions(+), 97 deletions(-)
----------------------------------------------------------------------