You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2018/03/12 04:35:38 UTC
[1/2] oodt git commit: Stabilized avro rpc resource manager client
and server by: 1. Adding 3 more RPC methods to be backward compatible with
existing RPC client. 2. Added tests for both client and server by adapting
XML version's tests. 3. And they are
Repository: oodt
Updated Branches:
refs/heads/development ba6ca1b4d -> 9feacbb01
Stabilized avro rpc resource manager client and server by:
1. Adding 3 more RPC methods to be backward compatible with existing RPC client.
2. Added tests for both client and server by adapting XML version's tests.
3. And they are passing
Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/1b921b5c
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/1b921b5c
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/1b921b5c
Branch: refs/heads/development
Commit: 1b921b5c64783d7e929cd435057fbdb2188e47c9
Parents: ba6ca1b
Author: Imesha Sudasingha <im...@gmail.com>
Authored: Fri Mar 2 23:18:14 2018 +0530
Committer: Imesha Sudasingha <im...@gmail.com>
Committed: Fri Mar 2 23:18:14 2018 +0530
----------------------------------------------------------------------
.../org/apache/oodt/commons/AvroExecServer.java | 11 -
.../avro/types/resource_manager_protocol.avdl | 6 +
resource/src/main/bin/resmgr | 3 +-
.../oodt/cas/resource/scheduler/Scheduler.java | 93 ++++---
.../resource/system/AvroRpcResourceManager.java | 241 +++++++++++++------
.../system/AvroRpcResourceManagerClient.java | 40 ++-
.../cas/resource/system/ResourceManager.java | 12 +-
.../resource/system/ResourceManagerClient.java | 11 +-
.../resource/system/ResourceManagerMain.java | 66 +++++
.../resource/system/XmlRpcResourceManager.java | 86 +++----
.../system/XmlRpcResourceManagerClient.java | 46 ++--
.../system/TestAvroRpcResourceManager.java | 64 ++---
.../TestAvroRpcResourceManagerClient.java | 211 ++++++++++++++++
.../system/TestXmlRpcResourceManager.java | 3 +-
.../system/TestXmlRpcResourceManagerClient.java | 238 +++++++++---------
.../TestDistributedAvroRpcResourceManager.java | 85 +++++++
.../TestDistributedXmlRpcResourceManager.java | 1 +
17 files changed, 851 insertions(+), 366 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java b/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java
index eb1c786..c779acd 100644
--- a/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java
+++ b/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java
@@ -17,16 +17,9 @@
package org.apache.oodt.commons;
-import org.apache.avro.Protocol;
-import org.apache.avro.ipc.HttpServer;
-import org.apache.avro.ipc.Responder;
-import org.apache.avro.ipc.Server;
-import org.apache.avro.ipc.generic.GenericResponder;
-import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.oodt.commons.io.Base64EncodingOutputStream;
import org.apache.oodt.commons.util.LogInit;
import org.apache.oodt.commons.util.XML;
-import org.apache.xmlrpc.XmlRpcServer;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.DocumentType;
@@ -69,10 +62,6 @@ public class AvroExecServer {
/** The <log> element within the status document. */
private Element logElement;
- /** The XML-RPC interface to this server. */
- private HttpServer server;
-
-
/** Status DTD Document Type Definition formal public identifier. */
public static final String STATUS_FPI = "-//JPL//DTD EDA Server Status 1.0";
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/avro/types/resource_manager_protocol.avdl
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/resource_manager_protocol.avdl b/resource/src/main/avro/types/resource_manager_protocol.avdl
index 8b3f43f..f5d6725 100644
--- a/resource/src/main/avro/types/resource_manager_protocol.avdl
+++ b/resource/src/main/avro/types/resource_manager_protocol.avdl
@@ -20,6 +20,12 @@ import schema "AvroResourceNode.avsc";
string getExecutionNode(string jobId);
+ string getExecReport();
+
+ string getNodeReport();
+
+ array<AvroJob> getQueuedJobs();
+
string handleJob(AvroJob exec, AvroJobInput into);
boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, string hostUrl);
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/bin/resmgr
----------------------------------------------------------------------
diff --git a/resource/src/main/bin/resmgr b/resource/src/main/bin/resmgr
index da6ae29..f80cd90 100644
--- a/resource/src/main/bin/resmgr
+++ b/resource/src/main/bin/resmgr
@@ -51,7 +51,8 @@ case "$1" in
$JAVA_HOME/bin/java -Djava.ext.dirs=${CAS_RESMGR_HOME}/lib \
-Djava.util.logging.config.file=${CAS_RESMGR_HOME}/etc/logging.properties \
-Dorg.apache.oodt.cas.resource.properties=${CAS_RESMGR_PROPS} \
- org.apache.oodt.cas.resource.system.XmlRpcResourceManager --portNum $SERVER_PORT &
+ -Dresmgr.manager=org.apache.oodt.cas.resource.system.AvroRpcResourceManager \
+ org.apache.oodt.cas.resource.system.ResourceManagerMain --portNum $SERVER_PORT &
echo $! >${RUN_HOME}/cas.resmgr.pid
echo "OK"
sleep 5
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java b/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
index 764527e..991722b 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
@@ -19,10 +19,10 @@
package org.apache.oodt.cas.resource.scheduler;
//OODT imports
+
import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
import org.apache.oodt.cas.resource.jobqueue.JobQueue;
import org.apache.oodt.cas.resource.monitor.Monitor;
-import org.apache.oodt.cas.resource.scheduler.QueueManager;
import org.apache.oodt.cas.resource.structs.JobSpec;
import org.apache.oodt.cas.resource.structs.ResourceNode;
import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
@@ -31,62 +31,57 @@ import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
* @author woollard
* @author bfoster
* @version $Revision$
- *
+ * <p>
* <p>
* A scheduler interface.
* </p>
- *
*/
-public interface Scheduler extends Runnable{
+public interface Scheduler extends Runnable {
- /**
- * Schedules a job to be executed by a particular batch manager.
- *
- * @param spec
- * The {@link JobSpec} to schedule for execution.
- * @return Whether the job was successfully scheduled or not.
- * @throws SchedulerException If there was any error scheduling
- * the given {@link JobSpec}.
- */
+ /**
+ * Schedules a job to be executed by a particular batch manager.
+ *
+ * @param spec The {@link JobSpec} to schedule for execution.
+ * @return Whether the job was successfully scheduled or not.
+ * @throws SchedulerException If there was any error scheduling
+ * the given {@link JobSpec}.
+ */
boolean schedule(JobSpec spec) throws SchedulerException;
- /**
- * Returns the ResourceNode that is considered to be <quote>most available</quote>
- * within our underlying set of resources for the given JobSpec.
- * @param spec The JobSpec to find an available node for.
- * @return The {@link ResourceNode} best suited to handle this {@link JobSpec}
- * @throws SchedulerException If any error occurs.
- */
- ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException;
+ /**
+ * Returns the ResourceNode that is considered to be <quote>most available</quote>
+ * within our underlying set of resources for the given JobSpec.
+ *
+ * @param spec The JobSpec to find an available node for.
+ * @return The {@link ResourceNode} best suited to handle this {@link JobSpec}
+ * @throws SchedulerException If any error occurs.
+ */
+ ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException;
+
+ /**
+ * @return The underlying {@link Monitor} used by this
+ * Scheduler.
+ */
+ Monitor getMonitor();
+
+ /**
+ * @return The underlying {@link Batchmgr} used by this
+ * Scheduler.
+ */
+ Batchmgr getBatchmgr();
+
+
+ /**
+ * @return The underlying {@link JobQueue} used by this
+ * Scheduler.
+ */
+ JobQueue getJobQueue();
- /**
- *
- * @return The underlying {@link Monitor} used by this
- * Scheduler.
- */
- Monitor getMonitor();
-
- /**
- *
- * @return The underlying {@link Batchmgr} used by this
- * Scheduler.
- */
- Batchmgr getBatchmgr();
-
-
- /**
- *
- * @return The underlying {@link JobQueue} used by this
- * Scheduler.
- */
- JobQueue getJobQueue();
+ /**
+ * @return The underlying {@link QueueManager} used by this
+ * Scheduler.
+ */
+ QueueManager getQueueManager();
- /**
- *
- * @return The underlying {@link QueueManager} used by this
- * Scheduler.
- */
- QueueManager getQueueManager();
-
}
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
index d224cf5..fc3e2ae 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
@@ -22,72 +22,91 @@ import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.oodt.cas.resource.scheduler.Scheduler;
-import org.apache.oodt.cas.resource.structs.*;
-import org.apache.oodt.cas.resource.structs.avrotypes.*;
-import org.apache.oodt.cas.resource.structs.exceptions.*;
+import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroJob;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroJobInput;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
-import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
-import org.apache.xmlrpc.WebServer;
-
-import java.io.File;
-import java.io.FileInputStream;
+import org.apache.oodt.cas.resource.util.ResourceNodeComparator;
+import org.apache.oodt.config.Component;
+import org.apache.oodt.config.ConfigurationManager;
+import org.apache.oodt.config.ConfigurationManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.Hashtable;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Vector;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager{
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
- private int port = 2000;
+public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager {
- private Logger LOG = Logger
- .getLogger(XmlRpcResourceManager.class.getName());
+ private static final Logger logger = LoggerFactory.getLogger(AvroRpcResourceManager.class);
+ private int port = 2000;
private Server server;
+ /** our scheduler */
+ private Scheduler scheduler;
+ /** Configuration Manager instance of this instance */
+ private ConfigurationManager configurationManager;
+ private ExecutorService executorService;
- /* our scheduler */
- private Scheduler scheduler = null;
+ public AvroRpcResourceManager(int port) {
+ this.port = port;
- public AvroRpcResourceManager(int port) throws Exception{
- // load properties from workflow manager properties file, if specified
+ List<String> propertiesFiles = new ArrayList<>();
+ // set up the configuration, if there is any
if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) {
- String configFile = System
- .getProperty("org.apache.oodt.cas.resource.properties");
- LOG.log(Level.INFO,
- "Loading Resource Manager Configuration Properties from: ["
- + configFile + "]");
- System.getProperties().load(
- new FileInputStream(new File(configFile)));
+ propertiesFiles.add(System.getProperty("org.apache.oodt.cas.resource.properties"));
}
- String schedulerClassStr = System.getProperty(
- "resource.scheduler.factory",
- "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory");
+ configurationManager = ConfigurationManagerFactory
+ .getConfigurationManager(Component.RESOURCE_MANAGER, propertiesFiles);
+ }
- scheduler = GenericResourceManagerObjectFactory
- .getSchedulerServiceFromFactory(schedulerClassStr);
+ @Override
+ public void startUp() throws Exception {
+ try {
+ configurationManager.loadConfiguration();
+ } catch (Exception e) {
+ logger.error("Unable to load configuration", e);
+ throw new IOException("Unable to load configuration", e);
+ }
- // start up the scheduler
- new Thread(scheduler).start();
+ String schedulerClassStr = System.getProperty("resource.scheduler.factory",
+ "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory");
+ scheduler = GenericResourceManagerObjectFactory.getSchedulerServiceFromFactory(schedulerClassStr);
- this.port = port;
+ // start up the scheduler
+ executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(scheduler);
// start up the web server
- server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class,this),
+ server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class, this),
new InetSocketAddress(this.port));
server.start();
- LOG.log(Level.INFO, "Resource Manager started by "
- + System.getProperty("user.name", "unknown"));
-
+ logger.info("Resource Manager started by {}", System.getProperty("user.name", "unknown"));
}
@Override
- public boolean isAlive() throws AvroRemoteException {
+ public boolean isAlive() {
return true;
}
@@ -95,7 +114,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
public int getJobQueueSize() throws AvroRemoteException {
try {
return this.scheduler.getJobQueue().getSize();
- }catch (Exception e) {
+ } catch (Exception e) {
throw new AvroRemoteException(new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e));
}
}
@@ -105,7 +124,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
public int getJobQueueCapacity() throws AvroRemoteException {
try {
return this.scheduler.getJobQueue().getCapacity();
- }catch (Exception e) {
+ } catch (Exception e) {
throw new AvroRemoteException(new JobRepositoryException("Failed to get capacity of JobQueue : " + e.getMessage(), e));
}
}
@@ -117,10 +136,11 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
jobId);
return scheduler.getJobQueue().getJobRepository().jobFinished(spec);
- } catch(JobRepositoryException e ){
+ } catch (JobRepositoryException e) {
throw new AvroRemoteException(e);
}
}
+
@Override
public AvroJob getJobInfo(String jobId) throws AvroRemoteException {
JobSpec spec = null;
@@ -129,15 +149,12 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
spec = scheduler.getJobQueue().getJobRepository()
.getJobById(jobId);
} catch (JobRepositoryException e) {
- LOG.log(Level.WARNING,
- "Exception communicating with job repository for job: ["
- + jobId + "]: Message: " + e.getMessage());
+ logger.warn("Exception communicating with job repository for job: [{}]: Message: {}", jobId, e.getMessage());
throw new AvroRemoteException(new JobRepositoryException("Unable to get job: [" + jobId
+ "] from repository!"));
}
return AvroTypeFactory.getAvroJob(spec.getJob());
-
}
@Override
@@ -152,7 +169,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
@Override
public boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, String hostUrl) throws AvroRemoteException {
try {
- return genericHandleJob(exec,in,hostUrl);
+ return genericHandleJob(exec, in, hostUrl);
} catch (JobExecutionException e) {
throw new AvroRemoteException(e);
}
@@ -186,9 +203,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
public boolean killJob(String jobId) throws AvroRemoteException {
String resNodeId = scheduler.getBatchmgr().getExecutionNode(jobId);
if (resNodeId == null) {
- LOG.log(Level.WARNING, "Attempt to kill job: [" + jobId
- + "]: cannot find execution node"
- + " (has the job already finished?)");
+ logger.warn("Attempt to kill job: [{}]: cannot find execution node (has the job already finished?)", jobId);
return false;
}
ResourceNode node = null;
@@ -205,14 +220,101 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
public String getExecutionNode(String jobId) throws AvroRemoteException {
String execNode = scheduler.getBatchmgr().getExecutionNode(jobId);
if (execNode == null) {
- LOG.log(Level.WARNING, "Job: [" + jobId
- + "] not currently executing on any known node");
+ logger.warn("Job: [{}] not currently executing on any known node", jobId);
return "";
} else
return execNode;
}
@Override
+ public String getNodeReport() {
+ StringBuilder report = new StringBuilder();
+
+ try {
+
+ // get a sorted list of nodes
+ List nodes = scheduler.getMonitor().getNodes();
+ Collections.sort(nodes, new ResourceNodeComparator());
+
+ // formulate the report string
+ for (Object node1 : nodes) {
+ ResourceNode node = (ResourceNode) node1;
+ String nodeId = node.getNodeId();
+ report.append(nodeId);
+ report.append(" (").append(getNodeLoad(nodeId)).append("/").append(node.getCapacity()).append(")");
+ List<String> nodeQueues = getQueuesWithNode(nodeId);
+ if (nodeQueues != null && nodeQueues.size() > 0) {
+ report.append(" -- ").append(nodeQueues.get(0));
+ for (int j = 1; j < nodeQueues.size(); j++) {
+ report.append(", ").append(nodeQueues.get(j));
+ }
+ }
+ report.append("\n");
+ }
+ } catch (Exception e) {
+ return null;
+ }
+
+ return report.toString();
+ }
+
+ public List<AvroJob> getQueuedJobs() {
+ List<AvroJob> jobs = new ArrayList<>();
+ List jobSpecs = this.scheduler.getJobQueue().getQueuedJobs();
+
+ if (jobSpecs != null && jobSpecs.size() > 0) {
+ for (Object jobSpec : jobSpecs) {
+ Job job = ((JobSpec) jobSpec).getJob();
+ jobs.add(AvroTypeFactory.getAvroJob(job));
+ }
+ }
+
+ return jobs;
+ }
+
+ @Override
+ public String getExecReport() {
+ StringBuilder report = new StringBuilder();
+
+ try {
+
+ // get a sorted list of all nodes, since the report should be
+ // alphabetically sorted by node
+ List resNodes = scheduler.getMonitor().getNodes();
+ if (resNodes.size() == 0) {
+ throw new MonitorException(
+ "No jobs can be executing, as there are no nodes in the Monitor");
+ }
+ Vector<String> nodeIds = new Vector<String>();
+ for (Object resNode : resNodes) {
+ nodeIds.add(((ResourceNode) resNode).getNodeId());
+ }
+ Collections.sort(nodeIds);
+
+ // generate the report string
+ for (String nodeId : nodeIds) {
+ List execJobIds = this.scheduler.getBatchmgr().getJobsOnNode(nodeId);
+ if (execJobIds != null && execJobIds.size() > 0) {
+ for (Object execJobId : execJobIds) {
+ String jobId = (String) execJobId;
+ Job job = scheduler.getJobQueue().getJobRepository()
+ .getJobById(jobId).getJob();
+ report.append("job id=").append(jobId);
+ report.append(", load=").append(job.getLoadValue());
+ report.append(", node=").append(nodeId);
+ report.append(", queue=").append(job.getQueueName()).append("\n");
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ return null;
+ }
+
+ return report.toString();
+ }
+
+ @Override
public List<String> getQueues() throws AvroRemoteException {
try {
return this.scheduler.getQueueManager().getQueues();
@@ -255,12 +357,12 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
@Override
public boolean removeNode(String nodeId) throws AvroRemoteException {
- try{
- for(String queueName: this.getQueuesWithNode(nodeId)){
+ try {
+ for (String queueName : this.getQueuesWithNode(nodeId)) {
this.removeNodeFromQueue(nodeId, queueName);
}
this.scheduler.getMonitor().removeNodeById(nodeId);
- }catch(Exception e){
+ } catch (Exception e) {
throw new AvroRemoteException(new MonitorException(e.getMessage(), e));
}
@@ -307,13 +409,18 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
}
}
- public boolean shutdown(){
+ @Override
+ public boolean shutdown() {
+ configurationManager.clearConfiguration();
+ executorService.shutdownNow();
+
if (this.server != null) {
this.server.close();
this.server = null;
return true;
- } else
+ } else {
return false;
+ }
}
@Override
@@ -346,7 +453,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
AvroRpcResourceManager manager = new AvroRpcResourceManager(portNum);
- for (;;)
+ for (; ; )
try {
Thread.currentThread().join();
} catch (InterruptedException ignore) {
@@ -356,15 +463,13 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
@Override
public boolean setNodeCapacity(String nodeId, int capacity) throws AvroRemoteException {
- try{
+ try {
this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity);
- }catch (MonitorException e){
- LOG.log(Level.WARNING, "Exception setting capacity on node "
- + nodeId + ": " + e.getMessage());
+ } catch (MonitorException e) {
+ logger.warn("Exception setting capacity on node {}: ", nodeId, e.getMessage());
return false;
}
return true;
-
}
@@ -381,15 +486,14 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
try {
jobId = scheduler.getJobQueue().addJob(spec);
} catch (JobQueueException e) {
- LOG.log(Level.WARNING, "JobQueue exception adding job: Message: "
- + e.getMessage());
+ logger.warn("JobQueue exception adding job: Message: {}", e.getMessage());
throw new SchedulerException(e.getMessage());
}
return jobId;
}
private boolean genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput,
- String urlStr) throws JobExecutionException {
+ String urlStr) throws JobExecutionException {
Job exec = AvroTypeFactory.getJob(avroJob);
JobInput in = AvroTypeFactory.getJobInput(avroJobInput);
@@ -415,8 +519,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
try {
url = new URL(urlStr);
} catch (MalformedURLException e) {
- LOG.log(Level.WARNING, "Error converting string: [" + urlStr
- + "] to URL object: Message: " + e.getMessage());
+ logger.warn("Error converting string: [{}] to URL object: Message: {}", urlStr, e.getMessage());
}
return url;
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
index fa0e84b..4dd0f33 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
@@ -26,11 +26,12 @@ import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
import org.apache.oodt.cas.resource.structs.Job;
import org.apache.oodt.cas.resource.structs.JobInput;
import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager;
import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
-import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager;
import java.io.File;
import java.io.FileInputStream;
@@ -155,9 +156,29 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
}
@Override
+ public String getNodeReport() throws MonitorException {
+ try {
+ return proxy.getNodeReport();
+ } catch (AvroRemoteException e) {
+ LOG.log(Level.SEVERE, "Server error!");
+ }
+ return null;
+ }
+
+ @Override
+ public String getExecReport() throws JobRepositoryException {
+ try {
+ return proxy.getExecReport();
+ } catch (AvroRemoteException e) {
+ LOG.log(Level.SEVERE, "Server error!");
+ }
+ return null;
+ }
+
+ @Override
public String submitJob(Job exec, JobInput in) throws JobExecutionException {
try {
- return proxy.handleJob(AvroTypeFactory.getAvroJob(exec),AvroTypeFactory.getAvroJobInput(in));
+ return proxy.handleJob(AvroTypeFactory.getAvroJob(exec), AvroTypeFactory.getAvroJobInput(in));
} catch (AvroRemoteException e) {
LOG.log(Level.SEVERE,
"Server error!");
@@ -243,7 +264,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
@Override
public void setNodeCapacity(String nodeId, int capacity) throws MonitorException {
try {
- proxy.setNodeCapacity(nodeId,capacity);
+ proxy.setNodeCapacity(nodeId, capacity);
} catch (AvroRemoteException e) {
throw new MonitorException(e);
}
@@ -252,7 +273,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
@Override
public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
try {
- proxy.addNodeToQueue(nodeId,queueName);
+ proxy.addNodeToQueue(nodeId, queueName);
} catch (AvroRemoteException e) {
throw new QueueManagerException(e);
}
@@ -261,7 +282,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
@Override
public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
try {
- proxy.removeNodeFromQueue(nodeId,queueName);
+ proxy.removeNodeFromQueue(nodeId, queueName);
} catch (AvroRemoteException e) {
throw new QueueManagerException(e);
}
@@ -302,4 +323,13 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
throw new MonitorException(e);
}
}
+
+ @Override
+ public List getQueuedJobs() throws JobQueueException {
+ try {
+ return proxy.getQueuedJobs();
+ } catch (AvroRemoteException e) {
+ throw new JobQueueException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
index 5cbf6d3..c09b299 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
@@ -17,15 +17,11 @@
package org.apache.oodt.cas.resource.system;
-import org.apache.oodt.cas.resource.structs.exceptions.*;
-
-import java.util.Date;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Vector;
-
public interface ResourceManager {
- boolean shutdown();
+ void startUp() throws Exception;
+
+ boolean isAlive();
+ boolean shutdown();
}
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
index dd4444b..d5cacbe 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
@@ -21,6 +21,7 @@ import org.apache.oodt.cas.resource.structs.Job;
import org.apache.oodt.cas.resource.structs.JobInput;
import org.apache.oodt.cas.resource.structs.ResourceNode;
import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
@@ -29,6 +30,7 @@ import java.net.URL;
import java.util.List;
public interface ResourceManagerClient {
+
boolean isJobComplete(String jobId) throws JobRepositoryException;
Job getJobInfo(String jobId) throws JobRepositoryException;
@@ -43,10 +45,13 @@ public interface ResourceManagerClient {
String getExecutionNode(String jobId);
+ String getNodeReport() throws MonitorException;
+
+ String getExecReport() throws JobRepositoryException;
+
String submitJob(Job exec, JobInput in) throws JobExecutionException;
- boolean submitJob(Job exec, JobInput in, URL hostUrl)
- throws JobExecutionException;
+ boolean submitJob(Job exec, JobInput in, URL hostUrl) throws JobExecutionException;
List getNodes() throws MonitorException;
@@ -77,4 +82,6 @@ public interface ResourceManagerClient {
List<String> getQueuesWithNode(String nodeId) throws QueueManagerException;
String getNodeLoad(String nodeId) throws MonitorException;
+
+ List getQueuedJobs() throws JobQueueException;
}
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
new file mode 100644
index 0000000..0880da1
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+
+public class ResourceManagerMain {
+
+ private static final Logger logger = LoggerFactory.getLogger(ResourceManagerMain.class);
+
+ public static void main(String[] args) throws Exception {
+ int portNum = -1;
+ String usage = "AvroRpcResourceManager --portNum <port number for xml rpc service>\n";
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("--portNum")) {
+ portNum = Integer.parseInt(args[++i]);
+ }
+ }
+
+ if (portNum == -1) {
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+ String resourceManagerClass = System.getProperty("resmgr.manager",
+ "org.apache.oodt.cas.resource.system.AvroRpcResourceManager");
+
+ logger.info("Starting resource manager {} at port: {}", resourceManagerClass, portNum);
+
+ Constructor<?> constructor = Class.forName(resourceManagerClass).getConstructor(Integer.TYPE);
+ final ResourceManager manager = (ResourceManager) constructor.newInstance(portNum);
+ manager.startUp();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ manager.shutdown();
+ }
+ });
+
+ for (; ; )
+ try {
+ Thread.currentThread().join();
+ } catch (InterruptedException ignore) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java
index dd698cb..9ee48c8 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java
@@ -57,25 +57,24 @@ import java.util.logging.Logger;
* <p>
* An XML RPC-based Resource manager.
* </p>
- *
+ *
*/
@Deprecated
-public class XmlRpcResourceManager {
-
- /* our log stream */
- private Logger LOG = Logger
- .getLogger(XmlRpcResourceManager.class.getName());
-
- /* our xml rpc web server */
- private WebServer webServer = null;
+public class XmlRpcResourceManager implements ResourceManager{
- /* our scheduler */
- private Scheduler scheduler = null;
+ /** our log stream */
+ private Logger LOG = Logger.getLogger(XmlRpcResourceManager.class.getName());
+ private int port;
+ /** our xml rpc web server */
+ private WebServer webServer;
+ /** our scheduler */
+ private Scheduler scheduler;
/** Configuration Manager instance of this instance */
private ConfigurationManager configurationManager;
public XmlRpcResourceManager(int port) throws IOException {
+ this.port = port;
List<String> propertiesFiles = new ArrayList<>();
// set up the configuration, if there is any
if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) {
@@ -83,6 +82,10 @@ public class XmlRpcResourceManager {
}
configurationManager = ConfigurationManagerFactory.getConfigurationManager(Component.RESOURCE_MANAGER, propertiesFiles);
+ }
+
+ @Override
+ public void startUp() throws Exception{
try {
configurationManager.loadConfiguration();
} catch (Exception e) {
@@ -100,8 +103,6 @@ public class XmlRpcResourceManager {
// start up the scheduler
new Thread(scheduler).start();
-
-
// start up the web server
webServer = new WebServer(port);
webServer.addHandler("resourcemgr", this);
@@ -109,13 +110,12 @@ public class XmlRpcResourceManager {
LOG.log(Level.INFO, "Resource Manager started by "
+ System.getProperty("user.name", "unknown"));
-
}
public boolean isAlive() {
return true;
}
-
+
/**
* Gets the number of Jobs in JobQueue
* @return Number of Jobs in JobQueue
@@ -128,7 +128,7 @@ public class XmlRpcResourceManager {
throw new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e);
}
}
-
+
/**
* Gets the max number of Jobs allowed in JobQueue
* @return Max number of Jobs
@@ -283,12 +283,12 @@ public class XmlRpcResourceManager {
public List<String> getQueues() {
return new Vector<String>(this.scheduler.getQueueManager().getQueues());
}
-
+
public boolean addQueue(String queueName) {
this.scheduler.getQueueManager().addQueue(queueName);
return true;
}
-
+
public boolean removeQueue(String queueName) {
this.scheduler.getQueueManager().removeQueue(queueName);
return true;
@@ -301,7 +301,7 @@ public class XmlRpcResourceManager {
this.scheduler.getMonitor().addNode(XmlRpcStructFactory.getResourceNodeFromXmlRpc(hashNode));
return true;
}
-
+
public boolean removeNode(String nodeId) throws MonitorException {
try{
for(String queueName: this.getQueuesWithNode(nodeId)){
@@ -311,31 +311,31 @@ public class XmlRpcResourceManager {
}catch(Exception e){
throw new MonitorException(e.getMessage(), e);
}
-
+
return true;
}
-
+
public boolean addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
this.scheduler.getQueueManager().addNodeToQueue(nodeId, queueName);
return true;
}
-
+
public boolean removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
this.scheduler.getQueueManager().removeNodeFromQueue(nodeId, queueName);
return true;
}
-
+
public List<String> getNodesInQueue(String queueName) throws QueueManagerException {
return new Vector<String>(this.scheduler.getQueueManager().getNodes(queueName));
}
-
+
public List<String> getQueuesWithNode(String nodeId) {
return new Vector<String>(this.scheduler.getQueueManager().getQueues(nodeId));
}
+ @Override
public boolean shutdown() {
configurationManager.clearConfiguration();
-
if (this.webServer != null) {
this.webServer.shutdown();
this.webServer = null;
@@ -344,37 +344,37 @@ public class XmlRpcResourceManager {
return false;
}
}
-
+
public String getNodeLoad(String nodeId) throws MonitorException{
ResourceNode node = this.scheduler.getMonitor().getNodeById(nodeId);
int capacity = node.getCapacity();
int load = (this.scheduler.getMonitor().getLoad(node)) * -1 + capacity;
return load + "/" + capacity;
}
-
+
public List getQueuedJobs() {
Vector jobs = new Vector();
List jobSpecs = this.scheduler.getJobQueue().getQueuedJobs();
-
+
if(jobSpecs != null && jobSpecs.size() > 0){
for (Object jobSpec : jobSpecs) {
Job job = ((JobSpec) jobSpec).getJob();
jobs.add(job);
}
}
-
+
return XmlRpcStructFactory.getXmlRpcJobList(jobs);
}
-
+
public String getNodeReport() throws MonitorException{
StringBuilder report = new StringBuilder();
-
+
try{
-
+
// get a sorted list of nodes
List nodes = scheduler.getMonitor().getNodes();
Collections.sort(nodes, new ResourceNodeComparator());
-
+
// formulate the report string
for (Object node1 : nodes) {
ResourceNode node = (ResourceNode) node1;
@@ -390,19 +390,19 @@ public class XmlRpcResourceManager {
}
report.append("\n");
}
-
+
}catch(Exception e){
throw new MonitorException(e.getMessage(), e);
}
-
+
return report.toString();
}
-
+
public String getExecutionReport() throws JobRepositoryException{
StringBuilder report = new StringBuilder();
-
+
try{
-
+
// get a sorted list of all nodes, since the report should be
// alphabetically sorted by node
List resNodes = scheduler.getMonitor().getNodes();
@@ -415,7 +415,7 @@ public class XmlRpcResourceManager {
nodeIds.add(((ResourceNode) resNode).getNodeId());
}
Collections.sort(nodeIds);
-
+
// generate the report string
for(String nodeId: nodeIds){
List execJobIds = this.scheduler.getBatchmgr().getJobsOnNode(nodeId);
@@ -431,14 +431,14 @@ public class XmlRpcResourceManager {
}
}
}
-
+
}catch(Exception e){
throw new JobRepositoryException(e.getMessage(), e);
}
-
+
return report.toString();
}
-
+
public static void main(String[] args) throws IOException {
int portNum = -1;
String usage = "XmlRpcResourceManager --portNum <port number for xml rpc service>\n";
@@ -463,7 +463,7 @@ public class XmlRpcResourceManager {
}
}
}
-
+
public boolean setNodeCapacity(String nodeId, int capacity){
try{
this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity);
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
index 1fb4f84..a0ed618 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
@@ -52,7 +52,7 @@ import java.util.logging.Logger;
* <p>
* The XML RPC based resource manager client.
* </p>
- *
+ *
*/
@Deprecated
public class XmlRpcResourceManagerClient implements ResourceManagerClient {
@@ -76,7 +76,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
* Constructs a new XmlRpcResourceManagerClient with the given
* <code>url</code>.
* </p>
- *
+ *
* @param url
* The url pointer to the xml rpc resource manager service.
*/
@@ -188,7 +188,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new JobRepositoryException("Failed to get JobQueue from server : " + e.getMessage(), e);
}
}
-
+
/**
* Gets the max number of Jobs allowed in JobQueue
* @return Max number of Jobs
@@ -203,7 +203,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new JobRepositoryException("Failed to get JobQueue capacity from server : " + e.getMessage(), e);
}
}
-
+
@Override
public boolean killJob(String jobId) {
Vector argList = new Vector();
@@ -348,7 +348,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new QueueManagerException(e.getMessage(), e);
}
}
-
+
/**
* Removes the queue with the given name
* @param queueName The name of the queue to be removed
@@ -364,7 +364,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new QueueManagerException(e.getMessage(), e);
}
}
-
+
/**
* Adds a node
* @param node The node to be added
@@ -380,7 +380,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new MonitorException(e.getMessage(), e);
}
}
-
+
/**
* Removes the node with the given id
* @param nodeId The id of the node to be removed
@@ -396,7 +396,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new MonitorException(e.getMessage(), e);
}
}
-
+
@Override
public void setNodeCapacity(String nodeId, int capacity) throws MonitorException{
try{
@@ -408,7 +408,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new MonitorException(e.getMessage(), e);
}
}
-
+
/**
* Addes the node with given id to the queue with the given name
* @param nodeId The id of the node to be added to the given queueName
@@ -426,7 +426,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new QueueManagerException(e.getMessage(), e);
}
}
-
+
/**
* Remove the node with the given id from the queue with the given name
* @param nodeId The id of the node to be remove from the given queueName
@@ -444,7 +444,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new QueueManagerException(e.getMessage(), e);
}
}
-
+
/**
* Gets a list of currently supported queue names
* @return A list of currently supported queue names
@@ -459,7 +459,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new QueueManagerException(e.getMessage(), e);
}
}
-
+
/**
* Gets a list of ids of the nodes in the given queue
* @param queueName The name of the queue to get node ids from
@@ -476,7 +476,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new QueueManagerException(e.getMessage(), e);
}
}
-
+
/**
* Gets a list of queues which contain the node with the given nodeId
* @param nodeId The id of the node to get queues it belongs to
@@ -493,7 +493,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
throw new QueueManagerException(e.getMessage(), e);
}
}
-
+
/**
* Report on the load of the requested node
* @param nodeId The id of the node to be polled
@@ -511,42 +511,44 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
}
}
+ @Override
public List getQueuedJobs() throws JobQueueException{
Vector queuedJobs;
-
+
try{
queuedJobs = (Vector)client.execute("resourcemgr.getQueuedJobs", new Vector<Object>());
}catch(Exception e){
throw new JobQueueException(e.getMessage(), e);
}
-
+
return XmlRpcStructFactory.getJobListFromXmlRpc(queuedJobs);
- }
+ }
+ @Override
public String getNodeReport() throws MonitorException{
String report;
-
+
try{
report = (String)client.execute("resourcemgr.getNodeReport", new Vector<Object>());
}catch(Exception e){
throw new MonitorException(e.getMessage(), e);
}
-
+
return report;
}
public String getExecReport() throws JobRepositoryException{
String report;
-
+
try{
report = (String)client.execute("resourcemgr.getExecutionReport", new Vector<Object>());
}catch(Exception e){
throw new JobRepositoryException(e.getMessage(), e);
}
-
+
return report;
- }
+ }
public static String getReadableJobStatus(String status) {
if (status.equals(JobStatus.SUCCESS)) {
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java
index b5cf5eb..1033fad 100644
--- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java
@@ -17,10 +17,12 @@
package org.apache.oodt.cas.resource.system;
-import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.oodt.cas.resource.structs.NameValueJobInput;
import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.File;
import java.io.FileFilter;
@@ -29,7 +31,11 @@ import java.io.IOException;
import java.net.URL;
import java.util.Properties;
-public class TestAvroRpcResourceManager extends TestCase {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class TestAvroRpcResourceManager {
private File tmpPolicyDir;
@@ -37,21 +43,27 @@ public class TestAvroRpcResourceManager extends TestCase {
private static final int RM_PORT = 50001;
- public void testFake() {
-
+ @Before
+ public void setUp() throws Exception {
+ try {
+ System.out.println(NameValueJobInput.class.getCanonicalName());
+ generateTestConfiguration();
+ rm = new AvroRpcResourceManager(RM_PORT);
+ rm.startUp();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
-
+
/**
* @since OODT-182
*/
- //Disabled until API impl can be finished
- public void XtestDynSetNodeCapacity() {
+ @Test
+ public void testDynSetNodeCapacity() {
AvroRpcResourceManagerClient rmc = null;
try {
- rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:"
- + RM_PORT));
+ rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT));
} catch (Exception e) {
- System.out.println("radu1");
e.printStackTrace();
fail(e.getMessage());
}
@@ -60,7 +72,6 @@ public class TestAvroRpcResourceManager extends TestCase {
try {
rmc.setNodeCapacity("localhost", 8);
} catch (MonitorException e) {
- System.out.println("radu2");
e.printStackTrace();
fail(e.getMessage());
}
@@ -68,9 +79,7 @@ public class TestAvroRpcResourceManager extends TestCase {
int setCapacity = -1;
try {
setCapacity = rmc.getNodeById("localhost").getCapacity();
-
} catch (Exception e) {
- System.out.println("radu3");
e.printStackTrace();
fail(e.getMessage());
}
@@ -78,31 +87,11 @@ public class TestAvroRpcResourceManager extends TestCase {
}
- /*
- * (non-Javadoc)
- *
- * @see junit.framework.TestCase#setUp()
- */
- @Override
- protected void setUp() throws Exception {
- try {
- System.out.println(NameValueJobInput.class.getCanonicalName());
- generateTestConfiguration();
- this.rm = new AvroRpcResourceManager(RM_PORT);
- }
- catch (Exception e){
- e.printStackTrace();
+ @After
+ public void tearDown() {
+ if (this.rm != null) {
+ this.rm.shutdown();
}
- }
-
- /*
- * (non-Javadoc)
- *
- * @see junit.framework.TestCase#tearDown()
- */
- @Override
- protected void tearDown() throws Exception {
- if (this.rm != null) this.rm.shutdown();
deleteAllFiles(this.tmpPolicyDir.getAbsolutePath());
}
@@ -117,7 +106,6 @@ public class TestAvroRpcResourceManager extends TestCase {
}
startDirFile.delete();
-
}
private void generateTestConfiguration() throws IOException {
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java
new file mode 100644
index 0000000..d088bab
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the XmlRpcResourceManagerClient to ensure communications between client and server operate correctly.
+ */
+public class TestAvroRpcResourceManagerClient {
+
+ private static final int RM_PORT = 50001;
+
+ private static AvroRpcResourceManagerClient rmc;
+ private static AvroRpcResourceManager rm;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ generateTestConfiguration();
+ rm = new AvroRpcResourceManager(RM_PORT);
+ rm.startUp();
+ rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT));
+ }
+
+ private static void generateTestConfiguration() throws IOException {
+ Properties config = new Properties();
+
+ String propertiesFile = "." + File.separator + "src" + File.separator +
+ "test" + File.separator + "resources" + File.separator + "test.resource.properties";
+ System.getProperties().load(new FileInputStream(new File(propertiesFile)));
+
+ // stage policy
+ File tmpPolicyDir = null;
+ try {
+ tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ for (File policyFile : new File("./src/test/resources/policy")
+ .listFiles(new FileFilter() {
+
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.isFile() && pathname.getName().endsWith(".xml");
+ }
+ })) {
+ try {
+ FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir
+ .toURI().toString());
+ config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs",
+ tmpPolicyDir.toURI().toString());
+
+ System.getProperties().putAll(config);
+
+ }
+
+ @Test
+ public void testGetNodes() throws MonitorException {
+ List<Hashtable> nodes = rmc.getNodes();
+
+ assertThat(nodes, is(not(nullValue())));
+ assertThat(nodes, hasSize(1));
+
+ }
+
+ @Test
+ public void testGetExecutionReport() throws JobRepositoryException {
+ String execreport = rmc.getExecReport();
+ assertThat(execreport, is(not(nullValue())));
+ //TODO make it return more than an empty string;
+ }
+
+
+ @Test
+ public void testJobQueueCapacity() throws JobRepositoryException {
+ int capacity = rmc.getJobQueueCapacity();
+ assertThat(capacity, equalTo(1000));
+ }
+
+ @Test
+ public void testGetJobQueueSize() throws JobRepositoryException {
+ int size = rmc.getJobQueueSize();
+ assertThat(size, equalTo(0));
+ //TODO Make it change queue size
+ }
+
+ @Test
+ public void testGetNodeById() throws MonitorException {
+ List<ResourceNode> nodelist = rmc.getNodes();
+
+ ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId());
+
+ assertThat(node, is(not(nullValue())));
+
+ assertThat(node.getNodeId(), equalTo("localhost"));
+ }
+
+
+ @Test
+ public void testGetNodeLoad() throws MonitorException {
+
+ List<ResourceNode> nodelist = rmc.getNodes();
+
+ String node = rmc.getNodeLoad(nodelist.get(0).getNodeId());
+
+ assertNotNull(node);
+
+ assertThat(node, equalTo("0/8"));
+
+ }
+
+ @Test
+ public void testNodeReport() throws MonitorException {
+ String report = rmc.getNodeReport();
+
+ assertThat(report, is(not(nullValue())));
+ }
+
+ @Test
+ public void testGetNodesInQueue() throws QueueManagerException {
+ List<String> nodes = rmc.getNodesInQueue("long");
+
+ assertThat(nodes, is(not(nullValue())));
+
+ assertThat(nodes, hasSize(1));
+
+ }
+
+
+ @Test
+ public void testQueuedJobs() throws JobQueueException {
+ List jobs = rmc.getQueuedJobs();
+
+ assertThat(jobs, is(not(nullValue())));
+
+ //TODO queue a job
+ }
+
+ @Test
+ public void testQueuesWithNode() throws MonitorException, QueueManagerException {
+ List<ResourceNode> nodelist = rmc.getNodes();
+
+
+ List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId());
+ assertThat(queues, hasSize(3));
+
+ assertThat(queues, containsInAnyOrder("high", "quick", "long"));
+ }
+
+ @Test
+ public void testQueues() throws QueueManagerException {
+ List<String> queues = rmc.getQueues();
+
+ assertThat(queues, hasSize(3));
+
+ assertThat(queues, containsInAnyOrder("high", "quick", "long"));
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ rm.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java
index db5464b..b9b3860 100644
--- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java
@@ -87,11 +87,10 @@ public class TestXmlRpcResourceManager extends TestCase {
*/
@Override
protected void setUp() throws Exception {
-
System.out.println(NameValueJobInput.class.getCanonicalName());
-
generateTestConfiguration();
this.rm = new XmlRpcResourceManager(RM_PORT);
+ rm.startUp();
}
/*
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java
index cef79e4..8b1df40 100644
--- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java
@@ -19,7 +19,7 @@ import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
-
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -33,7 +33,11 @@ import java.util.List;
import java.util.Properties;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
@@ -43,164 +47,166 @@ import static org.junit.Assert.fail;
*/
public class TestXmlRpcResourceManagerClient {
- private static final int RM_PORT = 50001;
-
- private static XmlRpcResourceManagerClient rmc;
-
- @BeforeClass
- public static void setUp() throws Exception {
- generateTestConfiguration();
- XmlRpcResourceManager rm = new XmlRpcResourceManager(RM_PORT);
- rmc = new XmlRpcResourceManagerClient(new URL("http://localhost:" +RM_PORT));
+ private static final int RM_PORT = 50001;
- }
+ private static ResourceManagerClient rmc;
+ private static ResourceManager rm;
- private static void generateTestConfiguration() throws IOException {
- Properties config = new Properties();
-
- String propertiesFile = "." + File.separator + "src" + File.separator +
- "test" + File.separator + "resources" + File.separator + "test.resource.properties";
- System.getProperties().load(new FileInputStream(new File(propertiesFile)));
-
- // stage policy
- File tmpPolicyDir = null;
- try {
- tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- for (File policyFile : new File("./src/test/resources/policy")
- .listFiles(new FileFilter() {
-
- @Override
- public boolean accept(File pathname) {
- return pathname.isFile() && pathname.getName().endsWith(".xml");
- }
- })) {
- try {
- FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir);
- } catch (Exception e) {
- fail(e.getMessage());
- }
+ @BeforeClass
+ public static void setUp() throws Exception {
+ generateTestConfiguration();
+ rm = new XmlRpcResourceManager(RM_PORT);
+ rm.startUp();
+ rmc = new XmlRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT));
}
- config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir
- .toURI().toString());
- config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs",
- tmpPolicyDir.toURI().toString());
+ private static void generateTestConfiguration() throws IOException {
+ Properties config = new Properties();
+
+ String propertiesFile = "." + File.separator + "src" + File.separator +
+ "test" + File.separator + "resources" + File.separator + "test.resource.properties";
+ System.getProperties().load(new FileInputStream(new File(propertiesFile)));
+
+ // stage policy
+ File tmpPolicyDir = null;
+ try {
+ tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ for (File policyFile : new File("./src/test/resources/policy")
+ .listFiles(new FileFilter() {
+
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.isFile() && pathname.getName().endsWith(".xml");
+ }
+ })) {
+ try {
+ FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir
+ .toURI().toString());
+ config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs",
+ tmpPolicyDir.toURI().toString());
+
+ System.getProperties().putAll(config);
- System.getProperties().putAll(config);
-
- }
-
- @Test
- public void testGetNodes() throws MonitorException {
- List<Hashtable> nodes = rmc.getNodes();
-
- assertThat(nodes, is(not(nullValue())));
- assertThat(nodes, hasSize(1));
-
- }
+ }
- @Test
- public void testGetExecutionReport() throws JobRepositoryException {
+ @Test
+ public void testGetNodes() throws MonitorException {
+ List<Hashtable> nodes = rmc.getNodes();
- String execreport = rmc.getExecReport();
+ assertThat(nodes, is(not(nullValue())));
+ assertThat(nodes, hasSize(1));
+ }
- assertThat(execreport, is(not(nullValue())));
- //TODO make it return more than an empty string;
- }
+ @Test
+ public void testGetExecutionReport() throws JobRepositoryException {
+ String execreport = rmc.getExecReport();
+ assertThat(execreport, is(not(nullValue())));
+ //TODO make it return more than an empty string;
+ }
- @Test
- public void testJobQueueCapacity() throws JobRepositoryException {
- int capacity = rmc.getJobQueueCapacity();
- assertThat(capacity, equalTo(1000));
+ @Test
+ public void testJobQueueCapacity() throws JobRepositoryException {
+ int capacity = rmc.getJobQueueCapacity();
- }
+ assertThat(capacity, equalTo(1000));
- @Test
- public void testGetJobQueueSize() throws JobRepositoryException {
- int size = rmc.getJobQueueSize();
+ }
- assertThat(size, equalTo(0));
+ @Test
+ public void testGetJobQueueSize() throws JobRepositoryException {
+ int size = rmc.getJobQueueSize();
- //TODO Make it change queue size
+ assertThat(size, equalTo(0));
- }
+ //TODO Make it change queue size
- @Test
- public void testGetNodeById() throws MonitorException {
- List<ResourceNode> nodelist = rmc.getNodes();
+ }
- ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId());
+ @Test
+ public void testGetNodeById() throws MonitorException {
+ List<ResourceNode> nodelist = rmc.getNodes();
- assertThat(node, is(not(nullValue())));
+ ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId());
- assertThat(node.getNodeId(), equalTo("localhost"));
- }
+ assertThat(node, is(not(nullValue())));
+ assertThat(node.getNodeId(), equalTo("localhost"));
+ }
- @Test
- public void testGetNodeLoad() throws MonitorException {
- List<ResourceNode> nodelist = rmc.getNodes();
+ @Test
+ public void testGetNodeLoad() throws MonitorException {
- String node = rmc.getNodeLoad(nodelist.get(0).getNodeId());
+ List<ResourceNode> nodelist = rmc.getNodes();
- assertNotNull(node);
+ String node = rmc.getNodeLoad(nodelist.get(0).getNodeId());
- assertThat(node, equalTo("0/8"));
+ assertNotNull(node);
- }
+ assertThat(node, equalTo("0/8"));
- @Test
- public void testNodeReport() throws MonitorException {
- String report = rmc.getNodeReport();
+ }
- assertThat(report, is(not(nullValue())));
- }
+ @Test
+ public void testNodeReport() throws MonitorException {
+ String report = rmc.getNodeReport();
- @Test
- public void testGetNodesInQueue() throws QueueManagerException {
- List<String> nodes = rmc.getNodesInQueue("long");
+ assertThat(report, is(not(nullValue())));
+ }
- assertThat(nodes, is(not(nullValue())));
+ @Test
+ public void testGetNodesInQueue() throws QueueManagerException {
+ List<String> nodes = rmc.getNodesInQueue("long");
- assertThat(nodes, hasSize(1));
+ assertThat(nodes, is(not(nullValue())));
- }
+ assertThat(nodes, hasSize(1));
+ }
- @Test
- public void testQueuedJobs() throws JobQueueException {
- List jobs = rmc.getQueuedJobs();
+ @Test
+ public void testQueuedJobs() throws JobQueueException {
+ List jobs = rmc.getQueuedJobs();
- assertThat(jobs, is(not(nullValue())));
+ assertThat(jobs, is(not(nullValue())));
- //TODO queue a job
- }
+ //TODO queue a job
+ }
- @Test
- public void testQueuesWithNode() throws MonitorException, QueueManagerException {
- List<ResourceNode> nodelist = rmc.getNodes();
+ @Test
+ public void testQueuesWithNode() throws MonitorException, QueueManagerException {
+ List<ResourceNode> nodelist = rmc.getNodes();
- List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId());
- assertThat(queues, hasSize(3));
+ List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId());
+ assertThat(queues, hasSize(3));
- assertThat(queues, containsInAnyOrder("high", "quick", "long"));
- }
+ assertThat(queues, containsInAnyOrder("high", "quick", "long"));
+ }
- @Test
- public void testQueues() throws QueueManagerException {
- List<String> queues = rmc.getQueues();
+ @Test
+ public void testQueues() throws QueueManagerException {
+ List<String> queues = rmc.getQueues();
- assertThat(queues, hasSize(3));
+ assertThat(queues, hasSize(3));
- assertThat(queues, containsInAnyOrder("high", "quick", "long"));
- }
+ assertThat(queues, containsInAnyOrder("high", "quick", "long"));
+ }
+ @AfterClass
+ public static void tearDown() {
+ rm.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java
new file mode 100644
index 0000000..25fd2ee
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system.distributed;
+
+import org.apache.oodt.cas.resource.system.AvroRpcResourceManager;
+import org.apache.oodt.cas.resource.system.ResourceManager;
+import org.apache.oodt.cas.resource.system.TestAvroRpcResourceManager;
+import org.apache.oodt.config.distributed.cli.ConfigPublisher;
+import org.apache.oodt.config.test.AbstractDistributedConfigurationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.oodt.config.Constants.Properties.ENABLE_DISTRIBUTED_CONFIGURATION;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the operation of Resource Manager under distributed configuration management enabled
+ *
+ * @author Imesha Sudasingha
+ */
+public class TestDistributedAvroRpcResourceManager extends AbstractDistributedConfigurationTest {
+
+ private static final int RM_PORT = 50001;
+ private static final String CONF_PUBLISHER_XML = "config/distributed/config-publisher.xml";
+
+ private ResourceManager resourceManager;
+
+ @Before
+ public void setUpTest() throws Exception {
+ System.setProperty("org.apache.oodt.cas.cli.action.spring.config", "../config/src/main/resources/cmd-line-actions.xml");
+ System.setProperty("org.apache.oodt.cas.cli.option.spring.config", "../config/src/main/resources/cmd-line-options.xml");
+ System.setProperty(ENABLE_DISTRIBUTED_CONFIGURATION, "true");
+
+ ConfigPublisher.main(new String[]{
+ "-connectString", zookeeper.getConnectString(),
+ "-config", CONF_PUBLISHER_XML,
+ "-a", "publish"
+ });
+
+ try {
+ resourceManager = new AvroRpcResourceManager(RM_PORT);
+ resourceManager.startUp();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDynSetNodeCapacity() {
+ new TestAvroRpcResourceManager().testDynSetNodeCapacity();
+ }
+
+ @After
+ public void tearDownTest() throws Exception {
+ if (resourceManager != null) {
+ resourceManager.shutdown();
+ }
+
+ ConfigPublisher.main(new String[]{
+ "-connectString", zookeeper.getConnectString(),
+ "-config", CONF_PUBLISHER_XML,
+ "-a", "clear"
+ });
+
+ System.clearProperty("org.apache.oodt.cas.cli.action.spring.config");
+ System.clearProperty("org.apache.oodt.cas.cli.option.spring.config");
+ System.clearProperty(ENABLE_DISTRIBUTED_CONFIGURATION);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java
index 0649b44..16a2b6f 100644
--- a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java
@@ -54,6 +54,7 @@ public class TestDistributedXmlRpcResourceManager extends AbstractDistributedCon
try {
resourceManager = new XmlRpcResourceManager(RM_PORT);
+ resourceManager.startUp();
} catch (Exception e) {
fail(e.getMessage());
}
[2/2] oodt git commit: Altered resource manager client CLI to use
avro resource manager client
Posted by ma...@apache.org.
Altered resource manager client CLI to use avro resource manager client
Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/9feacbb0
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/9feacbb0
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/9feacbb0
Branch: refs/heads/development
Commit: 9feacbb01b573c0eaf9f85972414ac9eaa784841
Parents: 1b921b5
Author: Imesha Sudasingha <im...@gmail.com>
Authored: Mon Mar 5 18:35:58 2018 +0530
Committer: Imesha Sudasingha <im...@gmail.com>
Committed: Mon Mar 5 18:35:58 2018 +0530
----------------------------------------------------------------------
resource/src/main/bin/resmgr-client | 2 +-
.../resource/cli/action/ResourceCliAction.java | 56 ++++++++--------
.../system/ResourceManagerClientMain.java | 28 ++++++++
.../resource/system/ResourceManagerMain.java | 14 ++--
.../system/rpc/ResourceManagerFactory.java | 67 ++++++++++++++++++++
5 files changed, 130 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/bin/resmgr-client
----------------------------------------------------------------------
diff --git a/resource/src/main/bin/resmgr-client b/resource/src/main/bin/resmgr-client
index e7dbd3d..0376b66 100644
--- a/resource/src/main/bin/resmgr-client
+++ b/resource/src/main/bin/resmgr-client
@@ -31,4 +31,4 @@ $JAVA_HOME/bin/java \
-Djava.util.logging.config.file=../etc/logging.properties \
-Dorg.apache.oodt.cas.cli.action.spring.config=../policy/cmd-line-actions.xml \
-Dorg.apache.oodt.cas.cli.option.spring.config=../policy/cmd-line-options.xml \
- org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient $*
+ org.apache.oodt.cas.resource.system.ResourceManagerClientMain $*
http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java b/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java
index 5cbb360..114be7f 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java
@@ -16,16 +16,17 @@
*/
package org.apache.oodt.cas.resource.cli.action;
-//JDK imports
-import java.net.MalformedURLException;
-import java.net.URL;
//Apache imports
import org.apache.commons.lang.Validate;
-
-//OODT imports
import org.apache.oodt.cas.cli.action.CmdLineAction;
-import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.resource.system.ResourceManagerClient;
+import org.apache.oodt.cas.resource.system.rpc.ResourceManagerFactory;
+
+//JDK imports
+import java.net.MalformedURLException;
+import java.net.URL;
+
/**
* Base {@link CmdLineAction} for Workflow Manager.
@@ -34,24 +35,27 @@ import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
*/
public abstract class ResourceCliAction extends CmdLineAction {
- private XmlRpcResourceManagerClient client;
-
- public String getUrl() {
- return System.getProperty("org.apache.oodt.cas.resource.url");
- }
-
- protected XmlRpcResourceManagerClient getClient()
- throws MalformedURLException {
- Validate.notNull(getUrl());
-
- if (client != null) {
- return client;
- } else {
- return new XmlRpcResourceManagerClient(new URL(getUrl()));
- }
- }
-
- public void setClient(XmlRpcResourceManagerClient client) {
- this.client = client;
- }
+ private ResourceManagerClient client;
+
+ public String getUrl() {
+ return System.getProperty("org.apache.oodt.cas.resource.url");
+ }
+
+ protected ResourceManagerClient getClient() throws MalformedURLException {
+ Validate.notNull(getUrl());
+
+ if (client != null) {
+ return client;
+ } else {
+ try {
+ return ResourceManagerFactory.getResourceManagerClient(new URL(getUrl()));
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create client", e);
+ }
+ }
+ }
+
+ public void setClient(ResourceManagerClient client) {
+ this.client = client;
+ }
}
http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClientMain.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClientMain.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClientMain.java
new file mode 100644
index 0000000..c0750fe
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClientMain.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.oodt.cas.cli.CmdLineUtility;
+
+public class ResourceManagerClientMain {
+
+ public static void main(String[] args) {
+ CmdLineUtility cmdLineUtility = new CmdLineUtility();
+ cmdLineUtility.run(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
index 0880da1..162d64e 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
@@ -17,11 +17,10 @@
package org.apache.oodt.cas.resource.system;
+import org.apache.oodt.cas.resource.system.rpc.ResourceManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Constructor;
-
public class ResourceManagerMain {
private static final Logger logger = LoggerFactory.getLogger(ResourceManagerMain.class);
@@ -41,13 +40,7 @@ public class ResourceManagerMain {
System.exit(1);
}
- String resourceManagerClass = System.getProperty("resmgr.manager",
- "org.apache.oodt.cas.resource.system.AvroRpcResourceManager");
-
- logger.info("Starting resource manager {} at port: {}", resourceManagerClass, portNum);
-
- Constructor<?> constructor = Class.forName(resourceManagerClass).getConstructor(Integer.TYPE);
- final ResourceManager manager = (ResourceManager) constructor.newInstance(portNum);
+ final ResourceManager manager = ResourceManagerFactory.getResourceManager(portNum);
manager.startUp();
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -57,10 +50,11 @@ public class ResourceManagerMain {
}
});
- for (; ; )
+ for (; ; ) {
try {
Thread.currentThread().join();
} catch (InterruptedException ignore) {
}
+ }
}
}
http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java
new file mode 100644
index 0000000..9287f6e
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system.rpc;
+
+import org.apache.oodt.cas.resource.system.ResourceManager;
+import org.apache.oodt.cas.resource.system.ResourceManagerClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.net.URL;
+
+public class ResourceManagerFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(ResourceManagerFactory.class);
+
+ public static ResourceManager getResourceManager(int port) throws Exception {
+ String resourceManagerClass = System.getProperty("resmgr.manager",
+ "org.apache.oodt.cas.resource.system.AvroRpcResourceManager");
+
+ logger.info("Creating resource manager {} at port: {}", resourceManagerClass, port);
+
+ ResourceManager manager;
+ try {
+ Constructor<?> constructor = Class.forName(resourceManagerClass).getConstructor(Integer.TYPE);
+ manager = (ResourceManager) constructor.newInstance(port);
+ } catch (Exception e) {
+ logger.error("Unable to create resource manager", e);
+ throw e;
+ }
+
+ return manager;
+ }
+
+ public static ResourceManagerClient getResourceManagerClient(URL url) throws Exception {
+ String resMgrClientClass = System.getProperty("resmgr.manager.client",
+ "org.apache.oodt.cas.resource.system.AvroRpcResourceManagerClient");
+
+ logger.info("Creating resource manager client {}", resMgrClientClass);
+
+ ResourceManagerClient client;
+ try {
+ Constructor<?> constructor = Class.forName(resMgrClientClass).getConstructor(URL.class);
+ client = (ResourceManagerClient) constructor.newInstance(url);
+ } catch (Exception e) {
+ logger.error("Unable to create resource manager", e);
+ throw e;
+ }
+
+ return client;
+ }
+}