You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2018/03/12 04:35:38 UTC

[1/2] oodt git commit: Stabilized avro rpc resource manager client and server by: 1. Adding 3 more RPC methods to be backward compatible with existing RPC client. 2. Added tests for both client and server by adapting XML version's tests. 3. And they are

Repository: oodt
Updated Branches:
  refs/heads/development ba6ca1b4d -> 9feacbb01


Stabilized avro rpc resource manager client and server by:
1. Adding 3 more RPC methods to be backward compatible with existing RPC client.
2. Added tests for both client and server by adapting XML version's tests.
3. And they are passing


Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/1b921b5c
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/1b921b5c
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/1b921b5c

Branch: refs/heads/development
Commit: 1b921b5c64783d7e929cd435057fbdb2188e47c9
Parents: ba6ca1b
Author: Imesha Sudasingha <im...@gmail.com>
Authored: Fri Mar 2 23:18:14 2018 +0530
Committer: Imesha Sudasingha <im...@gmail.com>
Committed: Fri Mar 2 23:18:14 2018 +0530

----------------------------------------------------------------------
 .../org/apache/oodt/commons/AvroExecServer.java |  11 -
 .../avro/types/resource_manager_protocol.avdl   |   6 +
 resource/src/main/bin/resmgr                    |   3 +-
 .../oodt/cas/resource/scheduler/Scheduler.java  |  93 ++++---
 .../resource/system/AvroRpcResourceManager.java | 241 +++++++++++++------
 .../system/AvroRpcResourceManagerClient.java    |  40 ++-
 .../cas/resource/system/ResourceManager.java    |  12 +-
 .../resource/system/ResourceManagerClient.java  |  11 +-
 .../resource/system/ResourceManagerMain.java    |  66 +++++
 .../resource/system/XmlRpcResourceManager.java  |  86 +++----
 .../system/XmlRpcResourceManagerClient.java     |  46 ++--
 .../system/TestAvroRpcResourceManager.java      |  64 ++---
 .../TestAvroRpcResourceManagerClient.java       | 211 ++++++++++++++++
 .../system/TestXmlRpcResourceManager.java       |   3 +-
 .../system/TestXmlRpcResourceManagerClient.java | 238 +++++++++---------
 .../TestDistributedAvroRpcResourceManager.java  |  85 +++++++
 .../TestDistributedXmlRpcResourceManager.java   |   1 +
 17 files changed, 851 insertions(+), 366 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java b/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java
index eb1c786..c779acd 100644
--- a/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java
+++ b/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java
@@ -17,16 +17,9 @@
 
 package org.apache.oodt.commons;
 
-import org.apache.avro.Protocol;
-import org.apache.avro.ipc.HttpServer;
-import org.apache.avro.ipc.Responder;
-import org.apache.avro.ipc.Server;
-import org.apache.avro.ipc.generic.GenericResponder;
-import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.oodt.commons.io.Base64EncodingOutputStream;
 import org.apache.oodt.commons.util.LogInit;
 import org.apache.oodt.commons.util.XML;
-import org.apache.xmlrpc.XmlRpcServer;
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
 import org.w3c.dom.DocumentType;
@@ -69,10 +62,6 @@ public class AvroExecServer {
     /** The &lt;log&gt; element within the status document. */
     private Element logElement;
 
-    /** The XML-RPC interface to this server. */
-    private HttpServer server;
-
-
     /** Status DTD Document Type Definition formal public identifier. */
     public static final String STATUS_FPI = "-//JPL//DTD EDA Server Status 1.0";
 

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/avro/types/resource_manager_protocol.avdl
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/resource_manager_protocol.avdl b/resource/src/main/avro/types/resource_manager_protocol.avdl
index 8b3f43f..f5d6725 100644
--- a/resource/src/main/avro/types/resource_manager_protocol.avdl
+++ b/resource/src/main/avro/types/resource_manager_protocol.avdl
@@ -20,6 +20,12 @@ import schema "AvroResourceNode.avsc";
 
     string getExecutionNode(string jobId);
 
+    string getExecReport();
+
+    string getNodeReport();
+
+    array<AvroJob> getQueuedJobs();
+
     string handleJob(AvroJob exec, AvroJobInput into);
 
     boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, string hostUrl);

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/bin/resmgr
----------------------------------------------------------------------
diff --git a/resource/src/main/bin/resmgr b/resource/src/main/bin/resmgr
index da6ae29..f80cd90 100644
--- a/resource/src/main/bin/resmgr
+++ b/resource/src/main/bin/resmgr
@@ -51,7 +51,8 @@ case "$1" in
         $JAVA_HOME/bin/java -Djava.ext.dirs=${CAS_RESMGR_HOME}/lib \
         -Djava.util.logging.config.file=${CAS_RESMGR_HOME}/etc/logging.properties \
         -Dorg.apache.oodt.cas.resource.properties=${CAS_RESMGR_PROPS} \
-        org.apache.oodt.cas.resource.system.XmlRpcResourceManager --portNum $SERVER_PORT &       
+        -Dresmgr.manager=org.apache.oodt.cas.resource.system.AvroRpcResourceManager \
+        org.apache.oodt.cas.resource.system.ResourceManagerMain --portNum $SERVER_PORT &
         echo $! >${RUN_HOME}/cas.resmgr.pid 
         echo "OK"
         sleep 5

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java b/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
index 764527e..991722b 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
@@ -19,10 +19,10 @@
 package org.apache.oodt.cas.resource.scheduler;
 
 //OODT imports
+
 import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
 import org.apache.oodt.cas.resource.jobqueue.JobQueue;
 import org.apache.oodt.cas.resource.monitor.Monitor;
-import org.apache.oodt.cas.resource.scheduler.QueueManager;
 import org.apache.oodt.cas.resource.structs.JobSpec;
 import org.apache.oodt.cas.resource.structs.ResourceNode;
 import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
@@ -31,62 +31,57 @@ import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
  * @author woollard
  * @author bfoster
  * @version $Revision$
- * 
+ * <p>
  * <p>
  * A scheduler interface.
  * </p>
- * 
  */
-public interface Scheduler extends Runnable{
+public interface Scheduler extends Runnable {
 
-	/**
-	 * Schedules a job to be executed by a particular batch manager.
-	 * 
-	 * @param spec
-	 *            The {@link JobSpec} to schedule for execution.
-	 * @return Whether the job was successfully scheduled or not.
-  * @throws SchedulerException If there was any error scheduling
-  * the given {@link JobSpec}.
-	 */
+    /**
+     * Schedules a job to be executed by a particular batch manager.
+     *
+     * @param spec The {@link JobSpec} to schedule for execution.
+     * @return Whether the job was successfully scheduled or not.
+     * @throws SchedulerException If there was any error scheduling
+     *                            the given {@link JobSpec}.
+     */
     boolean schedule(JobSpec spec) throws SchedulerException;
 
 
- /**
-  * Returns the ResourceNode that is considered to be <quote>most available</quote>
-  * within our underlying set of resources for the given JobSpec.
-  * @param spec The JobSpec to find an available node for.
-  * @return The {@link ResourceNode} best suited to handle this {@link JobSpec}
-  * @throws SchedulerException If any error occurs.
-  */
- ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException;
+    /**
+     * Returns the ResourceNode that is considered to be <quote>most available</quote>
+     * within our underlying set of resources for the given JobSpec.
+     *
+     * @param spec The JobSpec to find an available node for.
+     * @return The {@link ResourceNode} best suited to handle this {@link JobSpec}
+     * @throws SchedulerException If any error occurs.
+     */
+    ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException;
+
+    /**
+     * @return The underlying {@link Monitor} used by this
+     * Scheduler.
+     */
+    Monitor getMonitor();
+
+    /**
+     * @return The underlying {@link Batchmgr} used by this
+     * Scheduler.
+     */
+    Batchmgr getBatchmgr();
+
+
+    /**
+     * @return The underlying {@link JobQueue} used by this
+     * Scheduler.
+     */
+    JobQueue getJobQueue();
 
- /**
-  * 
-  * @return The underlying {@link Monitor} used by this
-  * Scheduler.
-  */
- Monitor getMonitor();
- 
- /**
-  * 
-  * @return The underlying {@link Batchmgr} used by this
-  * Scheduler.
-  */
- Batchmgr getBatchmgr();
- 
- 
- /**
-  * 
-  * @return The underlying {@link JobQueue} used by this
-  * Scheduler.
-  */
- JobQueue getJobQueue();
+    /**
+     * @return The underlying {@link QueueManager} used by this
+     * Scheduler.
+     */
+    QueueManager getQueueManager();
 
- /**
-  * 
-  * @return The underlying {@link QueueManager} used by this
-  * Scheduler.
-  */
- QueueManager getQueueManager();
- 
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
index d224cf5..fc3e2ae 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
@@ -22,72 +22,91 @@ import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.oodt.cas.resource.scheduler.Scheduler;
-import org.apache.oodt.cas.resource.structs.*;
-import org.apache.oodt.cas.resource.structs.avrotypes.*;
-import org.apache.oodt.cas.resource.structs.exceptions.*;
+import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroJob;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroJobInput;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
 import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
-import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
-import org.apache.xmlrpc.WebServer;
-
-import java.io.File;
-import java.io.FileInputStream;
+import org.apache.oodt.cas.resource.util.ResourceNodeComparator;
+import org.apache.oodt.config.Component;
+import org.apache.oodt.config.ConfigurationManager;
+import org.apache.oodt.config.ConfigurationManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.Hashtable;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Vector;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager{
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-    private int port = 2000;
+public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager {
 
-    private Logger LOG = Logger
-            .getLogger(XmlRpcResourceManager.class.getName());
+    private static final Logger logger = LoggerFactory.getLogger(AvroRpcResourceManager.class);
 
+    private int port = 2000;
     private Server server;
+    /** our scheduler */
+    private Scheduler scheduler;
+    /** Configuration Manager instance of this instance */
+    private ConfigurationManager configurationManager;
+    private ExecutorService executorService;
 
-    /* our scheduler */
-    private Scheduler scheduler = null;
+    public AvroRpcResourceManager(int port) {
+        this.port = port;
 
-    public AvroRpcResourceManager(int port)  throws Exception{
-        // load properties from workflow manager properties file, if specified
+        List<String> propertiesFiles = new ArrayList<>();
+        // set up the configuration, if there is any
         if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) {
-            String configFile = System
-                    .getProperty("org.apache.oodt.cas.resource.properties");
-            LOG.log(Level.INFO,
-                    "Loading Resource Manager Configuration Properties from: ["
-                            + configFile + "]");
-            System.getProperties().load(
-                    new FileInputStream(new File(configFile)));
+            propertiesFiles.add(System.getProperty("org.apache.oodt.cas.resource.properties"));
         }
 
-        String schedulerClassStr = System.getProperty(
-                "resource.scheduler.factory",
-                "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory");
+        configurationManager = ConfigurationManagerFactory
+                .getConfigurationManager(Component.RESOURCE_MANAGER, propertiesFiles);
+    }
 
-        scheduler = GenericResourceManagerObjectFactory
-                .getSchedulerServiceFromFactory(schedulerClassStr);
+    @Override
+    public void startUp() throws Exception {
+        try {
+            configurationManager.loadConfiguration();
+        } catch (Exception e) {
+            logger.error("Unable to load configuration", e);
+            throw new IOException("Unable to load configuration", e);
+        }
 
-        // start up the scheduler
-        new Thread(scheduler).start();
+        String schedulerClassStr = System.getProperty("resource.scheduler.factory",
+                "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory");
+        scheduler = GenericResourceManagerObjectFactory.getSchedulerServiceFromFactory(schedulerClassStr);
 
-        this.port = port;
+        // start up the scheduler
+        executorService = Executors.newSingleThreadExecutor();
+        executorService.submit(scheduler);
 
         // start up the web server
-        server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class,this),
+        server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class, this),
                 new InetSocketAddress(this.port));
         server.start();
 
-        LOG.log(Level.INFO, "Resource Manager started by "
-                + System.getProperty("user.name", "unknown"));
-
+        logger.info("Resource Manager started by {}", System.getProperty("user.name", "unknown"));
     }
 
     @Override
-    public boolean isAlive() throws AvroRemoteException {
+    public boolean isAlive() {
         return true;
     }
 
@@ -95,7 +114,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
     public int getJobQueueSize() throws AvroRemoteException {
         try {
             return this.scheduler.getJobQueue().getSize();
-        }catch (Exception e) {
+        } catch (Exception e) {
             throw new AvroRemoteException(new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e));
         }
     }
@@ -105,7 +124,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
     public int getJobQueueCapacity() throws AvroRemoteException {
         try {
             return this.scheduler.getJobQueue().getCapacity();
-        }catch (Exception e) {
+        } catch (Exception e) {
             throw new AvroRemoteException(new JobRepositoryException("Failed to get capacity of JobQueue : " + e.getMessage(), e));
         }
     }
@@ -117,10 +136,11 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
                     jobId);
             return scheduler.getJobQueue().getJobRepository().jobFinished(spec);
 
-        } catch(JobRepositoryException e ){
+        } catch (JobRepositoryException e) {
             throw new AvroRemoteException(e);
         }
     }
+
     @Override
     public AvroJob getJobInfo(String jobId) throws AvroRemoteException {
         JobSpec spec = null;
@@ -129,15 +149,12 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
             spec = scheduler.getJobQueue().getJobRepository()
                     .getJobById(jobId);
         } catch (JobRepositoryException e) {
-            LOG.log(Level.WARNING,
-                    "Exception communicating with job repository for job: ["
-                            + jobId + "]: Message: " + e.getMessage());
+            logger.warn("Exception communicating with job repository for job: [{}]: Message: {}", jobId, e.getMessage());
             throw new AvroRemoteException(new JobRepositoryException("Unable to get job: [" + jobId
                     + "] from repository!"));
         }
 
         return AvroTypeFactory.getAvroJob(spec.getJob());
-
     }
 
     @Override
@@ -152,7 +169,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
     @Override
     public boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, String hostUrl) throws AvroRemoteException {
         try {
-            return genericHandleJob(exec,in,hostUrl);
+            return genericHandleJob(exec, in, hostUrl);
         } catch (JobExecutionException e) {
             throw new AvroRemoteException(e);
         }
@@ -186,9 +203,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
     public boolean killJob(String jobId) throws AvroRemoteException {
         String resNodeId = scheduler.getBatchmgr().getExecutionNode(jobId);
         if (resNodeId == null) {
-            LOG.log(Level.WARNING, "Attempt to kill job: [" + jobId
-                    + "]: cannot find execution node"
-                    + " (has the job already finished?)");
+            logger.warn("Attempt to kill job: [{}]: cannot find execution node (has the job already finished?)", jobId);
             return false;
         }
         ResourceNode node = null;
@@ -205,14 +220,101 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
     public String getExecutionNode(String jobId) throws AvroRemoteException {
         String execNode = scheduler.getBatchmgr().getExecutionNode(jobId);
         if (execNode == null) {
-            LOG.log(Level.WARNING, "Job: [" + jobId
-                    + "] not currently executing on any known node");
+            logger.warn("Job: [{}] not currently executing on any known node", jobId);
             return "";
         } else
             return execNode;
     }
 
     @Override
+    public String getNodeReport() {
+        StringBuilder report = new StringBuilder();
+
+        try {
+
+            // get a sorted list of nodes
+            List nodes = scheduler.getMonitor().getNodes();
+            Collections.sort(nodes, new ResourceNodeComparator());
+
+            // formulate the report string
+            for (Object node1 : nodes) {
+                ResourceNode node = (ResourceNode) node1;
+                String nodeId = node.getNodeId();
+                report.append(nodeId);
+                report.append(" (").append(getNodeLoad(nodeId)).append("/").append(node.getCapacity()).append(")");
+                List<String> nodeQueues = getQueuesWithNode(nodeId);
+                if (nodeQueues != null && nodeQueues.size() > 0) {
+                    report.append(" -- ").append(nodeQueues.get(0));
+                    for (int j = 1; j < nodeQueues.size(); j++) {
+                        report.append(", ").append(nodeQueues.get(j));
+                    }
+                }
+                report.append("\n");
+            }
+        } catch (Exception e) {
+            return null;
+        }
+
+        return report.toString();
+    }
+
+    public List<AvroJob> getQueuedJobs() {
+        List<AvroJob> jobs = new ArrayList<>();
+        List jobSpecs = this.scheduler.getJobQueue().getQueuedJobs();
+
+        if (jobSpecs != null && jobSpecs.size() > 0) {
+            for (Object jobSpec : jobSpecs) {
+                Job job = ((JobSpec) jobSpec).getJob();
+                jobs.add(AvroTypeFactory.getAvroJob(job));
+            }
+        }
+
+        return jobs;
+    }
+
+    @Override
+    public String getExecReport() {
+        StringBuilder report = new StringBuilder();
+
+        try {
+
+            // get a sorted list of all nodes, since the report should be
+            // alphabetically sorted by node
+            List resNodes = scheduler.getMonitor().getNodes();
+            if (resNodes.size() == 0) {
+                throw new MonitorException(
+                        "No jobs can be executing, as there are no nodes in the Monitor");
+            }
+            Vector<String> nodeIds = new Vector<String>();
+            for (Object resNode : resNodes) {
+                nodeIds.add(((ResourceNode) resNode).getNodeId());
+            }
+            Collections.sort(nodeIds);
+
+            // generate the report string
+            for (String nodeId : nodeIds) {
+                List execJobIds = this.scheduler.getBatchmgr().getJobsOnNode(nodeId);
+                if (execJobIds != null && execJobIds.size() > 0) {
+                    for (Object execJobId : execJobIds) {
+                        String jobId = (String) execJobId;
+                        Job job = scheduler.getJobQueue().getJobRepository()
+                                .getJobById(jobId).getJob();
+                        report.append("job id=").append(jobId);
+                        report.append(", load=").append(job.getLoadValue());
+                        report.append(", node=").append(nodeId);
+                        report.append(", queue=").append(job.getQueueName()).append("\n");
+                    }
+                }
+            }
+
+        } catch (Exception e) {
+            return null;
+        }
+
+        return report.toString();
+    }
+
+    @Override
     public List<String> getQueues() throws AvroRemoteException {
         try {
             return this.scheduler.getQueueManager().getQueues();
@@ -255,12 +357,12 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
 
     @Override
     public boolean removeNode(String nodeId) throws AvroRemoteException {
-        try{
-            for(String queueName: this.getQueuesWithNode(nodeId)){
+        try {
+            for (String queueName : this.getQueuesWithNode(nodeId)) {
                 this.removeNodeFromQueue(nodeId, queueName);
             }
             this.scheduler.getMonitor().removeNodeById(nodeId);
-        }catch(Exception e){
+        } catch (Exception e) {
             throw new AvroRemoteException(new MonitorException(e.getMessage(), e));
         }
 
@@ -307,13 +409,18 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
         }
     }
 
-    public boolean shutdown(){
+    @Override
+    public boolean shutdown() {
+        configurationManager.clearConfiguration();
+        executorService.shutdownNow();
+
         if (this.server != null) {
             this.server.close();
             this.server = null;
             return true;
-        } else
+        } else {
             return false;
+        }
     }
 
     @Override
@@ -346,7 +453,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
 
         AvroRpcResourceManager manager = new AvroRpcResourceManager(portNum);
 
-        for (;;)
+        for (; ; )
             try {
                 Thread.currentThread().join();
             } catch (InterruptedException ignore) {
@@ -356,15 +463,13 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
 
     @Override
     public boolean setNodeCapacity(String nodeId, int capacity) throws AvroRemoteException {
-        try{
+        try {
             this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity);
-        }catch (MonitorException e){
-            LOG.log(Level.WARNING, "Exception setting capacity on node "
-                    + nodeId + ": " + e.getMessage());
+        } catch (MonitorException e) {
+            logger.warn("Exception setting capacity on node {}: ", nodeId, e.getMessage());
             return false;
         }
         return true;
-
     }
 
 
@@ -381,15 +486,14 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
         try {
             jobId = scheduler.getJobQueue().addJob(spec);
         } catch (JobQueueException e) {
-            LOG.log(Level.WARNING, "JobQueue exception adding job: Message: "
-                    + e.getMessage());
+            logger.warn("JobQueue exception adding job: Message: {}", e.getMessage());
             throw new SchedulerException(e.getMessage());
         }
         return jobId;
     }
 
     private boolean genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput,
-                                     String urlStr) throws JobExecutionException {
+            String urlStr) throws JobExecutionException {
         Job exec = AvroTypeFactory.getJob(avroJob);
         JobInput in = AvroTypeFactory.getJobInput(avroJobInput);
 
@@ -415,8 +519,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru
         try {
             url = new URL(urlStr);
         } catch (MalformedURLException e) {
-            LOG.log(Level.WARNING, "Error converting string: [" + urlStr
-                    + "] to URL object: Message: " + e.getMessage());
+            logger.warn("Error converting string: [{}] to URL object: Message: {}", urlStr, e.getMessage());
         }
 
         return url;

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
index fa0e84b..4dd0f33 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
@@ -26,11 +26,12 @@ import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
 import org.apache.oodt.cas.resource.structs.Job;
 import org.apache.oodt.cas.resource.structs.JobInput;
 import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager;
 import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
 import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
 import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
 import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
-import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -155,9 +156,29 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
     }
 
     @Override
+    public String getNodeReport() throws MonitorException {
+        try {
+            return proxy.getNodeReport();
+        } catch (AvroRemoteException e) {
+            LOG.log(Level.SEVERE, "Server error!");
+        }
+        return null;
+    }
+
+    @Override
+    public String getExecReport() throws JobRepositoryException {
+        try {
+            return proxy.getExecReport();
+        } catch (AvroRemoteException e) {
+            LOG.log(Level.SEVERE, "Server error!");
+        }
+        return null;
+    }
+
+    @Override
     public String submitJob(Job exec, JobInput in) throws JobExecutionException {
         try {
-            return proxy.handleJob(AvroTypeFactory.getAvroJob(exec),AvroTypeFactory.getAvroJobInput(in));
+            return proxy.handleJob(AvroTypeFactory.getAvroJob(exec), AvroTypeFactory.getAvroJobInput(in));
         } catch (AvroRemoteException e) {
             LOG.log(Level.SEVERE,
                     "Server error!");
@@ -243,7 +264,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
     @Override
     public void setNodeCapacity(String nodeId, int capacity) throws MonitorException {
         try {
-            proxy.setNodeCapacity(nodeId,capacity);
+            proxy.setNodeCapacity(nodeId, capacity);
         } catch (AvroRemoteException e) {
             throw new MonitorException(e);
         }
@@ -252,7 +273,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
     @Override
     public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
         try {
-            proxy.addNodeToQueue(nodeId,queueName);
+            proxy.addNodeToQueue(nodeId, queueName);
         } catch (AvroRemoteException e) {
             throw new QueueManagerException(e);
         }
@@ -261,7 +282,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
     @Override
     public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
         try {
-            proxy.removeNodeFromQueue(nodeId,queueName);
+            proxy.removeNodeFromQueue(nodeId, queueName);
         } catch (AvroRemoteException e) {
             throw new QueueManagerException(e);
         }
@@ -302,4 +323,13 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient {
             throw new MonitorException(e);
         }
     }
+
+    @Override
+    public List getQueuedJobs() throws JobQueueException {
+        try {
+            return proxy.getQueuedJobs();
+        } catch (AvroRemoteException e) {
+            throw new JobQueueException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
index 5cbf6d3..c09b299 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
@@ -17,15 +17,11 @@
 
 package org.apache.oodt.cas.resource.system;
 
-import org.apache.oodt.cas.resource.structs.exceptions.*;
-
-import java.util.Date;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Vector;
-
 public interface ResourceManager {
 
-        boolean shutdown();
+    void startUp() throws Exception;
+
+    boolean isAlive();
 
+    boolean shutdown();
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
index dd4444b..d5cacbe 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
@@ -21,6 +21,7 @@ import org.apache.oodt.cas.resource.structs.Job;
 import org.apache.oodt.cas.resource.structs.JobInput;
 import org.apache.oodt.cas.resource.structs.ResourceNode;
 import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
 import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
 import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
 import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
@@ -29,6 +30,7 @@ import java.net.URL;
 import java.util.List;
 
 public interface ResourceManagerClient {
+
     boolean isJobComplete(String jobId) throws JobRepositoryException;
 
     Job getJobInfo(String jobId) throws JobRepositoryException;
@@ -43,10 +45,13 @@ public interface ResourceManagerClient {
 
     String getExecutionNode(String jobId);
 
+    String getNodeReport() throws MonitorException;
+
+    String getExecReport() throws JobRepositoryException;
+
     String submitJob(Job exec, JobInput in) throws JobExecutionException;
 
-    boolean submitJob(Job exec, JobInput in, URL hostUrl)
-            throws JobExecutionException;
+    boolean submitJob(Job exec, JobInput in, URL hostUrl) throws JobExecutionException;
 
     List getNodes() throws MonitorException;
 
@@ -77,4 +82,6 @@ public interface ResourceManagerClient {
     List<String> getQueuesWithNode(String nodeId) throws QueueManagerException;
 
     String getNodeLoad(String nodeId) throws MonitorException;
+
+    List getQueuedJobs() throws JobQueueException;
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
new file mode 100644
index 0000000..0880da1
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+
+public class ResourceManagerMain {
+
+    private static final Logger logger = LoggerFactory.getLogger(ResourceManagerMain.class);
+
+    public static void main(String[] args) throws Exception {
+        int portNum = -1;
+        String usage = "AvroRpcResourceManager --portNum <port number for xml rpc service>\n";
+
+        for (int i = 0; i < args.length; i++) {
+            if (args[i].equals("--portNum")) {
+                portNum = Integer.parseInt(args[++i]);
+            }
+        }
+
+        if (portNum == -1) {
+            System.err.println(usage);
+            System.exit(1);
+        }
+
+        String resourceManagerClass = System.getProperty("resmgr.manager",
+                "org.apache.oodt.cas.resource.system.AvroRpcResourceManager");
+
+        logger.info("Starting resource manager {} at port: {}", resourceManagerClass, portNum);
+
+        Constructor<?> constructor = Class.forName(resourceManagerClass).getConstructor(Integer.TYPE);
+        final ResourceManager manager = (ResourceManager) constructor.newInstance(portNum);
+        manager.startUp();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                manager.shutdown();
+            }
+        });
+
+        for (; ; )
+            try {
+                Thread.currentThread().join();
+            } catch (InterruptedException ignore) {
+            }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java
index dd698cb..9ee48c8 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java
@@ -57,25 +57,24 @@ import java.util.logging.Logger;
  * <p>
  * An XML RPC-based Resource manager.
  * </p>
- * 
+ *
  */
 @Deprecated
-public class XmlRpcResourceManager {
-
-    /* our log stream */
-    private Logger LOG = Logger
-            .getLogger(XmlRpcResourceManager.class.getName());
-
-    /* our xml rpc web server */
-    private WebServer webServer = null;
+public class XmlRpcResourceManager implements ResourceManager{
 
-    /* our scheduler */
-    private Scheduler scheduler = null;
+    /** our log stream */
+    private Logger LOG = Logger.getLogger(XmlRpcResourceManager.class.getName());
 
+    private int port;
+    /** our xml rpc web server */
+    private WebServer webServer;
+    /** our scheduler */
+    private Scheduler scheduler;
     /** Configuration Manager instance of this instance */
     private ConfigurationManager configurationManager;
 
     public XmlRpcResourceManager(int port) throws IOException {
+        this.port = port;
         List<String> propertiesFiles = new ArrayList<>();
         // set up the configuration, if there is any
         if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) {
@@ -83,6 +82,10 @@ public class XmlRpcResourceManager {
         }
 
         configurationManager = ConfigurationManagerFactory.getConfigurationManager(Component.RESOURCE_MANAGER, propertiesFiles);
+    }
+
+    @Override
+    public void startUp() throws Exception{
         try {
             configurationManager.loadConfiguration();
         } catch (Exception e) {
@@ -100,8 +103,6 @@ public class XmlRpcResourceManager {
         // start up the scheduler
         new Thread(scheduler).start();
 
-
-
         // start up the web server
         webServer = new WebServer(port);
         webServer.addHandler("resourcemgr", this);
@@ -109,13 +110,12 @@ public class XmlRpcResourceManager {
 
         LOG.log(Level.INFO, "Resource Manager started by "
                 + System.getProperty("user.name", "unknown"));
-
     }
 
     public boolean isAlive() {
         return true;
     }
-    
+
     /**
      * Gets the number of Jobs in JobQueue
      * @return Number of Jobs in JobQueue
@@ -128,7 +128,7 @@ public class XmlRpcResourceManager {
     		throw new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e);
     	}
     }
-    
+
     /**
      * Gets the max number of Jobs allowed in JobQueue
      * @return Max number of Jobs
@@ -283,12 +283,12 @@ public class XmlRpcResourceManager {
     public List<String> getQueues() {
     	return new Vector<String>(this.scheduler.getQueueManager().getQueues());
     }
-    
+
     public boolean addQueue(String queueName) {
     	this.scheduler.getQueueManager().addQueue(queueName);
     	return true;
     }
-    
+
     public boolean removeQueue(String queueName) {
     	this.scheduler.getQueueManager().removeQueue(queueName);
     	return true;
@@ -301,7 +301,7 @@ public class XmlRpcResourceManager {
     	this.scheduler.getMonitor().addNode(XmlRpcStructFactory.getResourceNodeFromXmlRpc(hashNode));
     	return true;
     }
-    
+
     public boolean removeNode(String nodeId) throws MonitorException {
     	try{
 	    	for(String queueName: this.getQueuesWithNode(nodeId)){
@@ -311,31 +311,31 @@ public class XmlRpcResourceManager {
     	}catch(Exception e){
     		throw new MonitorException(e.getMessage(), e);
     	}
-    	
+
     	return true;
     }
-    
+
     public boolean addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
     	this.scheduler.getQueueManager().addNodeToQueue(nodeId, queueName);
     	return true;
     }
-    
+
     public boolean removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
     	this.scheduler.getQueueManager().removeNodeFromQueue(nodeId, queueName);
     	return true;
     }
-    
+
     public List<String> getNodesInQueue(String queueName) throws QueueManagerException {
     	return new Vector<String>(this.scheduler.getQueueManager().getNodes(queueName));
     }
-    
+
     public List<String> getQueuesWithNode(String nodeId) {
     	return new Vector<String>(this.scheduler.getQueueManager().getQueues(nodeId));
     }
 
+    @Override
     public boolean shutdown() {
         configurationManager.clearConfiguration();
-
         if (this.webServer != null) {
             this.webServer.shutdown();
             this.webServer = null;
@@ -344,37 +344,37 @@ public class XmlRpcResourceManager {
             return false;
         }
     }
-    
+
     public String getNodeLoad(String nodeId) throws MonitorException{
     	ResourceNode node = this.scheduler.getMonitor().getNodeById(nodeId);
     	int capacity = node.getCapacity();
     	int load = (this.scheduler.getMonitor().getLoad(node)) * -1 + capacity;
     	return load + "/" + capacity;
     }
-    
+
     public List getQueuedJobs() {
     	Vector jobs = new Vector();
     	List jobSpecs = this.scheduler.getJobQueue().getQueuedJobs();
-    	
+
     	if(jobSpecs != null && jobSpecs.size() > 0){
             for (Object jobSpec : jobSpecs) {
                 Job job = ((JobSpec) jobSpec).getJob();
                 jobs.add(job);
             }
     	}
-    	
+
     	return XmlRpcStructFactory.getXmlRpcJobList(jobs);
     }
-    
+
     public String getNodeReport() throws MonitorException{
     	StringBuilder report = new StringBuilder();
-    	
+
     	try{
-    		
+
     		// get a sorted list of nodes
     		List nodes = scheduler.getMonitor().getNodes();
     		Collections.sort(nodes, new ResourceNodeComparator());
-    		
+
     		// formulate the report string
             for (Object node1 : nodes) {
                 ResourceNode node = (ResourceNode) node1;
@@ -390,19 +390,19 @@ public class XmlRpcResourceManager {
                 }
                 report.append("\n");
             }
-    	
+
     	}catch(Exception e){
     		throw new MonitorException(e.getMessage(), e);
     	}
-    	
+
     	return report.toString();
     }
-    
+
     public String getExecutionReport() throws JobRepositoryException{
     	StringBuilder report = new StringBuilder();
-    	
+
     	try{
-    	
+
 	    	// get a sorted list of all nodes, since the report should be
 	    	// alphabetically sorted by node
 	    	List resNodes = scheduler.getMonitor().getNodes();
@@ -415,7 +415,7 @@ public class XmlRpcResourceManager {
                 nodeIds.add(((ResourceNode) resNode).getNodeId());
             }
 	    	Collections.sort(nodeIds);
-	    	
+
 	    	// generate the report string
 	    	for(String nodeId: nodeIds){
 	    		List execJobIds = this.scheduler.getBatchmgr().getJobsOnNode(nodeId);
@@ -431,14 +431,14 @@ public class XmlRpcResourceManager {
                     }
 	    		}
 	    	}
-    	
+
     	}catch(Exception e){
     		throw new JobRepositoryException(e.getMessage(), e);
     	}
-    	
+
     	return report.toString();
     }
-    
+
     public static void main(String[] args) throws IOException {
         int portNum = -1;
         String usage = "XmlRpcResourceManager --portNum <port number for xml rpc service>\n";
@@ -463,7 +463,7 @@ public class XmlRpcResourceManager {
             }
         }
     }
-    
+
     public boolean setNodeCapacity(String nodeId, int capacity){
     	try{
     		this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity);

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
index 1fb4f84..a0ed618 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
@@ -52,7 +52,7 @@ import java.util.logging.Logger;
  * <p>
  * The XML RPC based resource manager client.
  * </p>
- * 
+ *
  */
 @Deprecated
 public class XmlRpcResourceManagerClient implements ResourceManagerClient {
@@ -76,7 +76,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
      * Constructs a new XmlRpcResourceManagerClient with the given
      * <code>url</code>.
      * </p>
-     * 
+     *
      * @param url
      *            The url pointer to the xml rpc resource manager service.
      */
@@ -188,7 +188,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new JobRepositoryException("Failed to get JobQueue from server : " + e.getMessage(), e);
         }
     }
-    
+
     /**
      * Gets the max number of Jobs allowed in JobQueue
      * @return Max number of Jobs
@@ -203,7 +203,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new JobRepositoryException("Failed to get JobQueue capacity from server : " + e.getMessage(), e);
         }
     }
-    
+
     @Override
     public boolean killJob(String jobId) {
         Vector argList = new Vector();
@@ -348,7 +348,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new QueueManagerException(e.getMessage(), e);
         }
     }
-    
+
     /**
      * Removes the queue with the given name
      * @param queueName The name of the queue to be removed
@@ -364,7 +364,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new QueueManagerException(e.getMessage(), e);
         }
     }
-    
+
     /**
      * Adds a node
      * @param node The node to be added
@@ -380,7 +380,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new MonitorException(e.getMessage(), e);
         }
     }
-    
+
     /**
      * Removes the node with the given id
      * @param nodeId The id of the node to be removed
@@ -396,7 +396,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new MonitorException(e.getMessage(), e);
         }
     }
-    
+
     @Override
     public void setNodeCapacity(String nodeId, int capacity) throws MonitorException{
     	try{
@@ -408,7 +408,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
     		throw new MonitorException(e.getMessage(), e);
     	}
     }
-    
+
     /**
      * Addes the node with given id to the queue with the given name
      * @param nodeId The id of the node to be added to the given queueName
@@ -426,7 +426,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new QueueManagerException(e.getMessage(), e);
         }
     }
-    
+
     /**
      * Remove the node with the given id from the queue with the given name
      * @param nodeId The id of the node to be remove from the given queueName
@@ -444,7 +444,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new QueueManagerException(e.getMessage(), e);
         }
     }
-    
+
     /**
      * Gets a list of currently supported queue names
      * @return A list of currently supported queue names
@@ -459,7 +459,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new QueueManagerException(e.getMessage(), e);
         }
     }
-    
+
     /**
      * Gets a list of ids of the nodes in the given queue
      * @param queueName The name of the queue to get node ids from
@@ -476,7 +476,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new QueueManagerException(e.getMessage(), e);
         }
     }
-    
+
     /**
      * Gets a list of queues which contain the node with the given nodeId
      * @param nodeId The id of the node to get queues it belongs to
@@ -493,7 +493,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
             throw new QueueManagerException(e.getMessage(), e);
         }
     }
-    
+
     /**
      * Report on the load of the requested node
      * @param nodeId The id of the node to be polled
@@ -511,42 +511,44 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient {
     	}
     }
 
+    @Override
    public List getQueuedJobs() throws JobQueueException{
         Vector queuedJobs;
-           
+
         try{
 	   queuedJobs = (Vector)client.execute("resourcemgr.getQueuedJobs", new Vector<Object>());
            }catch(Exception e){
 	      throw new JobQueueException(e.getMessage(), e);
            }
-	    
+
            return XmlRpcStructFactory.getJobListFromXmlRpc(queuedJobs);
-  }  
+  }
 
+    @Override
     public String getNodeReport() throws MonitorException{
 	String report;
-	
+
 	try{
 	    report = (String)client.execute("resourcemgr.getNodeReport", new Vector<Object>());
 	}catch(Exception e){
 	    throw new MonitorException(e.getMessage(), e);
         }
-	
+
        return report;
    }
 
 
     public String getExecReport() throws JobRepositoryException{
 	String report;
-	
+
 	try{
 	    report = (String)client.execute("resourcemgr.getExecutionReport", new Vector<Object>());
 	}catch(Exception e){
 	    throw new JobRepositoryException(e.getMessage(), e);
 	}
-	
+
 	return report;
-  }   
+  }
 
   public static String getReadableJobStatus(String status) {
     if (status.equals(JobStatus.SUCCESS)) {

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java
index b5cf5eb..1033fad 100644
--- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java
@@ -17,10 +17,12 @@
 
 package org.apache.oodt.cas.resource.system;
 
-import junit.framework.TestCase;
 import org.apache.commons.io.FileUtils;
 import org.apache.oodt.cas.resource.structs.NameValueJobInput;
 import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.File;
 import java.io.FileFilter;
@@ -29,7 +31,11 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.Properties;
 
-public class TestAvroRpcResourceManager extends TestCase {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class TestAvroRpcResourceManager {
 
     private File tmpPolicyDir;
 
@@ -37,21 +43,27 @@ public class TestAvroRpcResourceManager extends TestCase {
 
     private static final int RM_PORT = 50001;
 
-    public void testFake() {
-      
+    @Before
+    public void setUp() throws Exception {
+        try {
+            System.out.println(NameValueJobInput.class.getCanonicalName());
+            generateTestConfiguration();
+            rm = new AvroRpcResourceManager(RM_PORT);
+            rm.startUp();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
-    
+
     /**
      * @since OODT-182
      */
-    //Disabled until API impl can be finished  
-    public void XtestDynSetNodeCapacity() {
+    @Test
+    public void testDynSetNodeCapacity() {
         AvroRpcResourceManagerClient rmc = null;
         try {
-            rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:"
-                    + RM_PORT));
+            rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT));
         } catch (Exception e) {
-            System.out.println("radu1");
             e.printStackTrace();
             fail(e.getMessage());
         }
@@ -60,7 +72,6 @@ public class TestAvroRpcResourceManager extends TestCase {
         try {
             rmc.setNodeCapacity("localhost", 8);
         } catch (MonitorException e) {
-            System.out.println("radu2");
             e.printStackTrace();
             fail(e.getMessage());
         }
@@ -68,9 +79,7 @@ public class TestAvroRpcResourceManager extends TestCase {
         int setCapacity = -1;
         try {
             setCapacity = rmc.getNodeById("localhost").getCapacity();
-
         } catch (Exception e) {
-            System.out.println("radu3");
             e.printStackTrace();
             fail(e.getMessage());
         }
@@ -78,31 +87,11 @@ public class TestAvroRpcResourceManager extends TestCase {
 
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see junit.framework.TestCase#setUp()
-     */
-    @Override
-    protected void setUp() throws Exception {
-        try {
-            System.out.println(NameValueJobInput.class.getCanonicalName());
-            generateTestConfiguration();
-            this.rm = new AvroRpcResourceManager(RM_PORT);
-        }
-        catch (Exception e){
-            e.printStackTrace();
+    @After
+    public void tearDown() {
+        if (this.rm != null) {
+            this.rm.shutdown();
         }
-    }
-
-    /*
-     * (non-Javadoc)
-     *
-     * @see junit.framework.TestCase#tearDown()
-     */
-    @Override
-    protected void tearDown() throws Exception {
-        if (this.rm != null) this.rm.shutdown();
         deleteAllFiles(this.tmpPolicyDir.getAbsolutePath());
     }
 
@@ -117,7 +106,6 @@ public class TestAvroRpcResourceManager extends TestCase {
         }
 
         startDirFile.delete();
-
     }
 
     private void generateTestConfiguration() throws IOException {

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java
new file mode 100644
index 0000000..d088bab
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the XmlRpcResourceManagerClient to ensure communications between client and server operate correctly.
+ */
+public class TestAvroRpcResourceManagerClient {
+
+    private static final int RM_PORT = 50001;
+
+    private static AvroRpcResourceManagerClient rmc;
+    private static AvroRpcResourceManager rm;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        generateTestConfiguration();
+        rm = new AvroRpcResourceManager(RM_PORT);
+        rm.startUp();
+        rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT));
+    }
+
+    private static void generateTestConfiguration() throws IOException {
+        Properties config = new Properties();
+
+        String propertiesFile = "." + File.separator + "src" + File.separator +
+                "test" + File.separator + "resources" + File.separator + "test.resource.properties";
+        System.getProperties().load(new FileInputStream(new File(propertiesFile)));
+
+        // stage policy
+        File tmpPolicyDir = null;
+        try {
+            tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile();
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+        for (File policyFile : new File("./src/test/resources/policy")
+                .listFiles(new FileFilter() {
+
+                    @Override
+                    public boolean accept(File pathname) {
+                        return pathname.isFile() && pathname.getName().endsWith(".xml");
+                    }
+                })) {
+            try {
+                FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir);
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+        }
+
+        config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir
+                .toURI().toString());
+        config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs",
+                tmpPolicyDir.toURI().toString());
+
+        System.getProperties().putAll(config);
+
+    }
+
+    @Test
+    public void testGetNodes() throws MonitorException {
+        List<Hashtable> nodes = rmc.getNodes();
+
+        assertThat(nodes, is(not(nullValue())));
+        assertThat(nodes, hasSize(1));
+
+    }
+
+    @Test
+    public void testGetExecutionReport() throws JobRepositoryException {
+        String execreport = rmc.getExecReport();
+        assertThat(execreport, is(not(nullValue())));
+        //TODO make it return more than an empty string;
+    }
+
+
+    @Test
+    public void testJobQueueCapacity() throws JobRepositoryException {
+        int capacity = rmc.getJobQueueCapacity();
+        assertThat(capacity, equalTo(1000));
+    }
+
+    @Test
+    public void testGetJobQueueSize() throws JobRepositoryException {
+        int size = rmc.getJobQueueSize();
+        assertThat(size, equalTo(0));
+        //TODO Make it change queue size
+    }
+
+    @Test
+    public void testGetNodeById() throws MonitorException {
+        List<ResourceNode> nodelist = rmc.getNodes();
+
+        ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId());
+
+        assertThat(node, is(not(nullValue())));
+
+        assertThat(node.getNodeId(), equalTo("localhost"));
+    }
+
+
+    @Test
+    public void testGetNodeLoad() throws MonitorException {
+
+        List<ResourceNode> nodelist = rmc.getNodes();
+
+        String node = rmc.getNodeLoad(nodelist.get(0).getNodeId());
+
+        assertNotNull(node);
+
+        assertThat(node, equalTo("0/8"));
+
+    }
+
+    @Test
+    public void testNodeReport() throws MonitorException {
+        String report = rmc.getNodeReport();
+
+        assertThat(report, is(not(nullValue())));
+    }
+
+    @Test
+    public void testGetNodesInQueue() throws QueueManagerException {
+        List<String> nodes = rmc.getNodesInQueue("long");
+
+        assertThat(nodes, is(not(nullValue())));
+
+        assertThat(nodes, hasSize(1));
+
+    }
+
+
+    @Test
+    public void testQueuedJobs() throws JobQueueException {
+        List jobs = rmc.getQueuedJobs();
+
+        assertThat(jobs, is(not(nullValue())));
+
+        //TODO queue a job
+    }
+
+    @Test
+    public void testQueuesWithNode() throws MonitorException, QueueManagerException {
+        List<ResourceNode> nodelist = rmc.getNodes();
+
+
+        List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId());
+        assertThat(queues, hasSize(3));
+
+        assertThat(queues, containsInAnyOrder("high", "quick", "long"));
+    }
+
+    @Test
+    public void testQueues() throws QueueManagerException {
+        List<String> queues = rmc.getQueues();
+
+        assertThat(queues, hasSize(3));
+
+        assertThat(queues, containsInAnyOrder("high", "quick", "long"));
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        rm.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java
index db5464b..b9b3860 100644
--- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java
@@ -87,11 +87,10 @@ public class TestXmlRpcResourceManager extends TestCase {
    */
   @Override
   protected void setUp() throws Exception {
-
     System.out.println(NameValueJobInput.class.getCanonicalName());
-
     generateTestConfiguration();
     this.rm = new XmlRpcResourceManager(RM_PORT);
+    rm.startUp();
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java
index cef79e4..8b1df40 100644
--- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java
@@ -19,7 +19,7 @@ import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
 import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
 import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
 import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
-
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -33,7 +33,11 @@ import java.util.List;
 import java.util.Properties;
 
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -43,164 +47,166 @@ import static org.junit.Assert.fail;
  */
 public class TestXmlRpcResourceManagerClient {
 
-  private static final int RM_PORT = 50001;
-
-  private static XmlRpcResourceManagerClient rmc;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    generateTestConfiguration();
-    XmlRpcResourceManager rm = new XmlRpcResourceManager(RM_PORT);
-    rmc = new XmlRpcResourceManagerClient(new URL("http://localhost:" +RM_PORT));
+    private static final int RM_PORT = 50001;
 
-  }
+    private static ResourceManagerClient rmc;
+    private static ResourceManager rm;
 
-  private static void generateTestConfiguration() throws IOException {
-    Properties config = new Properties();
-
-    String propertiesFile = "." + File.separator + "src" + File.separator +
-                            "test" + File.separator + "resources" + File.separator + "test.resource.properties";
-    System.getProperties().load(new FileInputStream(new File(propertiesFile)));
-
-    // stage policy
-    File tmpPolicyDir = null;
-    try {
-      tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile();
-    } catch (Exception e) {
-      fail(e.getMessage());
-    }
-    for (File policyFile : new File("./src/test/resources/policy")
-        .listFiles(new FileFilter() {
-
-          @Override
-          public boolean accept(File pathname) {
-            return pathname.isFile() && pathname.getName().endsWith(".xml");
-          }
-        })) {
-      try {
-        FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir);
-      } catch (Exception e) {
-        fail(e.getMessage());
-      }
+    @BeforeClass
+    public static void setUp() throws Exception {
+        generateTestConfiguration();
+        rm = new XmlRpcResourceManager(RM_PORT);
+        rm.startUp();
+        rmc = new XmlRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT));
     }
 
-    config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir
-        .toURI().toString());
-    config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs",
-        tmpPolicyDir.toURI().toString());
+    private static void generateTestConfiguration() throws IOException {
+        Properties config = new Properties();
+
+        String propertiesFile = "." + File.separator + "src" + File.separator +
+                "test" + File.separator + "resources" + File.separator + "test.resource.properties";
+        System.getProperties().load(new FileInputStream(new File(propertiesFile)));
+
+        // stage policy
+        File tmpPolicyDir = null;
+        try {
+            tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile();
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+        for (File policyFile : new File("./src/test/resources/policy")
+                .listFiles(new FileFilter() {
+
+                    @Override
+                    public boolean accept(File pathname) {
+                        return pathname.isFile() && pathname.getName().endsWith(".xml");
+                    }
+                })) {
+            try {
+                FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir);
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+        }
+
+        config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir
+                .toURI().toString());
+        config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs",
+                tmpPolicyDir.toURI().toString());
+
+        System.getProperties().putAll(config);
 
-    System.getProperties().putAll(config);
-
-  }
-
-  @Test
-  public void testGetNodes() throws MonitorException {
-    List<Hashtable> nodes = rmc.getNodes();
-
-    assertThat(nodes, is(not(nullValue())));
-    assertThat(nodes, hasSize(1));
-
-  }
+    }
 
-  @Test
-  public void testGetExecutionReport() throws JobRepositoryException {
+    @Test
+    public void testGetNodes() throws MonitorException {
+        List<Hashtable> nodes = rmc.getNodes();
 
-    String execreport = rmc.getExecReport();
+        assertThat(nodes, is(not(nullValue())));
+        assertThat(nodes, hasSize(1));
 
+    }
 
-    assertThat(execreport, is(not(nullValue())));
-    //TODO make it return more than an empty string;
-  }
+    @Test
+    public void testGetExecutionReport() throws JobRepositoryException {
+        String execreport = rmc.getExecReport();
 
+        assertThat(execreport, is(not(nullValue())));
+        //TODO make it return more than an empty string;
+    }
 
-  @Test
-  public void testJobQueueCapacity() throws JobRepositoryException {
-    int capacity = rmc.getJobQueueCapacity();
 
-    assertThat(capacity, equalTo(1000));
+    @Test
+    public void testJobQueueCapacity() throws JobRepositoryException {
+        int capacity = rmc.getJobQueueCapacity();
 
-  }
+        assertThat(capacity, equalTo(1000));
 
-  @Test
-  public void testGetJobQueueSize() throws JobRepositoryException {
-    int size = rmc.getJobQueueSize();
+    }
 
-    assertThat(size, equalTo(0));
+    @Test
+    public void testGetJobQueueSize() throws JobRepositoryException {
+        int size = rmc.getJobQueueSize();
 
-    //TODO Make it change queue size
+        assertThat(size, equalTo(0));
 
-  }
+        //TODO Make it change queue size
 
-  @Test
-  public void testGetNodeById() throws MonitorException {
-    List<ResourceNode> nodelist = rmc.getNodes();
+    }
 
-    ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId());
+    @Test
+    public void testGetNodeById() throws MonitorException {
+        List<ResourceNode> nodelist = rmc.getNodes();
 
-    assertThat(node, is(not(nullValue())));
+        ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId());
 
-    assertThat(node.getNodeId(), equalTo("localhost"));
-  }
+        assertThat(node, is(not(nullValue())));
 
+        assertThat(node.getNodeId(), equalTo("localhost"));
+    }
 
-  @Test
-  public void testGetNodeLoad() throws MonitorException {
 
-    List<ResourceNode> nodelist = rmc.getNodes();
+    @Test
+    public void testGetNodeLoad() throws MonitorException {
 
-    String node = rmc.getNodeLoad(nodelist.get(0).getNodeId());
+        List<ResourceNode> nodelist = rmc.getNodes();
 
-    assertNotNull(node);
+        String node = rmc.getNodeLoad(nodelist.get(0).getNodeId());
 
-    assertThat(node, equalTo("0/8"));
+        assertNotNull(node);
 
-  }
+        assertThat(node, equalTo("0/8"));
 
-  @Test
-  public void testNodeReport() throws MonitorException {
-    String report = rmc.getNodeReport();
+    }
 
-    assertThat(report, is(not(nullValue())));
-  }
+    @Test
+    public void testNodeReport() throws MonitorException {
+        String report = rmc.getNodeReport();
 
-  @Test
-  public void testGetNodesInQueue() throws QueueManagerException {
-    List<String> nodes = rmc.getNodesInQueue("long");
+        assertThat(report, is(not(nullValue())));
+    }
 
-    assertThat(nodes, is(not(nullValue())));
+    @Test
+    public void testGetNodesInQueue() throws QueueManagerException {
+        List<String> nodes = rmc.getNodesInQueue("long");
 
-    assertThat(nodes, hasSize(1));
+        assertThat(nodes, is(not(nullValue())));
 
-  }
+        assertThat(nodes, hasSize(1));
+    }
 
 
-  @Test
-  public void testQueuedJobs() throws JobQueueException {
-    List jobs = rmc.getQueuedJobs();
+    @Test
+    public void testQueuedJobs() throws JobQueueException {
+        List jobs = rmc.getQueuedJobs();
 
-    assertThat(jobs, is(not(nullValue())));
+        assertThat(jobs, is(not(nullValue())));
 
-    //TODO queue a job
-  }
+        //TODO queue a job
+    }
 
-  @Test
-  public void testQueuesWithNode() throws MonitorException, QueueManagerException {
-    List<ResourceNode> nodelist = rmc.getNodes();
+    @Test
+    public void testQueuesWithNode() throws MonitorException, QueueManagerException {
+        List<ResourceNode> nodelist = rmc.getNodes();
 
 
-    List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId());
-    assertThat(queues, hasSize(3));
+        List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId());
+        assertThat(queues, hasSize(3));
 
-    assertThat(queues, containsInAnyOrder("high", "quick", "long"));
-  }
+        assertThat(queues, containsInAnyOrder("high", "quick", "long"));
+    }
 
-  @Test
-  public void testQueues() throws QueueManagerException {
-    List<String> queues = rmc.getQueues();
+    @Test
+    public void testQueues() throws QueueManagerException {
+        List<String> queues = rmc.getQueues();
 
-    assertThat(queues, hasSize(3));
+        assertThat(queues, hasSize(3));
 
-    assertThat(queues, containsInAnyOrder("high", "quick", "long"));
-  }
+        assertThat(queues, containsInAnyOrder("high", "quick", "long"));
+    }
 
+    @AfterClass
+    public static void tearDown() {
+        rm.shutdown();
+    }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java
new file mode 100644
index 0000000..25fd2ee
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system.distributed;
+
+import org.apache.oodt.cas.resource.system.AvroRpcResourceManager;
+import org.apache.oodt.cas.resource.system.ResourceManager;
+import org.apache.oodt.cas.resource.system.TestAvroRpcResourceManager;
+import org.apache.oodt.config.distributed.cli.ConfigPublisher;
+import org.apache.oodt.config.test.AbstractDistributedConfigurationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.oodt.config.Constants.Properties.ENABLE_DISTRIBUTED_CONFIGURATION;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the operation of Resource Manager under distributed configuration management enabled
+ *
+ * @author Imesha Sudasingha
+ */
+public class TestDistributedAvroRpcResourceManager extends AbstractDistributedConfigurationTest {
+
+    private static final int RM_PORT = 50001;
+    private static final String CONF_PUBLISHER_XML = "config/distributed/config-publisher.xml";
+
+    private ResourceManager resourceManager;
+
+    @Before
+    public void setUpTest() throws Exception {
+        System.setProperty("org.apache.oodt.cas.cli.action.spring.config", "../config/src/main/resources/cmd-line-actions.xml");
+        System.setProperty("org.apache.oodt.cas.cli.option.spring.config", "../config/src/main/resources/cmd-line-options.xml");
+        System.setProperty(ENABLE_DISTRIBUTED_CONFIGURATION, "true");
+
+        ConfigPublisher.main(new String[]{
+                "-connectString", zookeeper.getConnectString(),
+                "-config", CONF_PUBLISHER_XML,
+                "-a", "publish"
+        });
+
+        try {
+            resourceManager = new AvroRpcResourceManager(RM_PORT);
+            resourceManager.startUp();
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDynSetNodeCapacity() {
+        new TestAvroRpcResourceManager().testDynSetNodeCapacity();
+    }
+
+    @After
+    public void tearDownTest() throws Exception {
+        if (resourceManager != null) {
+            resourceManager.shutdown();
+        }
+
+        ConfigPublisher.main(new String[]{
+                "-connectString", zookeeper.getConnectString(),
+                "-config", CONF_PUBLISHER_XML,
+                "-a", "clear"
+        });
+
+        System.clearProperty("org.apache.oodt.cas.cli.action.spring.config");
+        System.clearProperty("org.apache.oodt.cas.cli.option.spring.config");
+        System.clearProperty(ENABLE_DISTRIBUTED_CONFIGURATION);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java
index 0649b44..16a2b6f 100644
--- a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java
@@ -54,6 +54,7 @@ public class TestDistributedXmlRpcResourceManager extends AbstractDistributedCon
 
         try {
             resourceManager = new XmlRpcResourceManager(RM_PORT);
+            resourceManager.startUp();
         } catch (Exception e) {
             fail(e.getMessage());
         }


[2/2] oodt git commit: Altered resource manager client CLI to use avro resource manager client

Posted by ma...@apache.org.
Altered resource manager client CLI to use avro resource manager client


Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/9feacbb0
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/9feacbb0
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/9feacbb0

Branch: refs/heads/development
Commit: 9feacbb01b573c0eaf9f85972414ac9eaa784841
Parents: 1b921b5
Author: Imesha Sudasingha <im...@gmail.com>
Authored: Mon Mar 5 18:35:58 2018 +0530
Committer: Imesha Sudasingha <im...@gmail.com>
Committed: Mon Mar 5 18:35:58 2018 +0530

----------------------------------------------------------------------
 resource/src/main/bin/resmgr-client             |  2 +-
 .../resource/cli/action/ResourceCliAction.java  | 56 ++++++++--------
 .../system/ResourceManagerClientMain.java       | 28 ++++++++
 .../resource/system/ResourceManagerMain.java    | 14 ++--
 .../system/rpc/ResourceManagerFactory.java      | 67 ++++++++++++++++++++
 5 files changed, 130 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/bin/resmgr-client
----------------------------------------------------------------------
diff --git a/resource/src/main/bin/resmgr-client b/resource/src/main/bin/resmgr-client
index e7dbd3d..0376b66 100644
--- a/resource/src/main/bin/resmgr-client
+++ b/resource/src/main/bin/resmgr-client
@@ -31,4 +31,4 @@ $JAVA_HOME/bin/java \
         -Djava.util.logging.config.file=../etc/logging.properties \
         -Dorg.apache.oodt.cas.cli.action.spring.config=../policy/cmd-line-actions.xml \
         -Dorg.apache.oodt.cas.cli.option.spring.config=../policy/cmd-line-options.xml \
-        org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient $*
+        org.apache.oodt.cas.resource.system.ResourceManagerClientMain $*

http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java b/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java
index 5cbb360..114be7f 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/cli/action/ResourceCliAction.java
@@ -16,16 +16,17 @@
  */
 package org.apache.oodt.cas.resource.cli.action;
 
-//JDK imports
-import java.net.MalformedURLException;
-import java.net.URL;
 
 //Apache imports
 import org.apache.commons.lang.Validate;
-
-//OODT imports
 import org.apache.oodt.cas.cli.action.CmdLineAction;
-import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.resource.system.ResourceManagerClient;
+import org.apache.oodt.cas.resource.system.rpc.ResourceManagerFactory;
+
+//JDK imports
+import java.net.MalformedURLException;
+import java.net.URL;
+
 
 /**
  * Base {@link CmdLineAction} for Workflow Manager.
@@ -34,24 +35,27 @@ import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
  */
 public abstract class ResourceCliAction extends CmdLineAction {
 
-   private XmlRpcResourceManagerClient client;
-
-   public String getUrl() {
-      return System.getProperty("org.apache.oodt.cas.resource.url");
-   }
-
-   protected XmlRpcResourceManagerClient getClient()
-         throws MalformedURLException {
-      Validate.notNull(getUrl());
-
-      if (client != null) {
-         return client;
-      } else {
-         return new XmlRpcResourceManagerClient(new URL(getUrl()));
-      }
-   }
-
-   public void setClient(XmlRpcResourceManagerClient client) {
-      this.client = client;
-   }
+    private ResourceManagerClient client;
+
+    public String getUrl() {
+        return System.getProperty("org.apache.oodt.cas.resource.url");
+    }
+
+    protected ResourceManagerClient getClient() throws MalformedURLException {
+        Validate.notNull(getUrl());
+
+        if (client != null) {
+            return client;
+        } else {
+            try {
+                return ResourceManagerFactory.getResourceManagerClient(new URL(getUrl()));
+            } catch (Exception e) {
+                throw new IllegalStateException("Unable to create client", e);
+            }
+        }
+    }
+
+    public void setClient(ResourceManagerClient client) {
+        this.client = client;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClientMain.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClientMain.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClientMain.java
new file mode 100644
index 0000000..c0750fe
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClientMain.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.oodt.cas.cli.CmdLineUtility;
+
+public class ResourceManagerClientMain {
+
+    public static void main(String[] args) {
+        CmdLineUtility cmdLineUtility = new CmdLineUtility();
+        cmdLineUtility.run(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
index 0880da1..162d64e 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java
@@ -17,11 +17,10 @@
 
 package org.apache.oodt.cas.resource.system;
 
+import org.apache.oodt.cas.resource.system.rpc.ResourceManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-
 public class ResourceManagerMain {
 
     private static final Logger logger = LoggerFactory.getLogger(ResourceManagerMain.class);
@@ -41,13 +40,7 @@ public class ResourceManagerMain {
             System.exit(1);
         }
 
-        String resourceManagerClass = System.getProperty("resmgr.manager",
-                "org.apache.oodt.cas.resource.system.AvroRpcResourceManager");
-
-        logger.info("Starting resource manager {} at port: {}", resourceManagerClass, portNum);
-
-        Constructor<?> constructor = Class.forName(resourceManagerClass).getConstructor(Integer.TYPE);
-        final ResourceManager manager = (ResourceManager) constructor.newInstance(portNum);
+        final ResourceManager manager = ResourceManagerFactory.getResourceManager(portNum);
         manager.startUp();
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -57,10 +50,11 @@ public class ResourceManagerMain {
             }
         });
 
-        for (; ; )
+        for (; ; ) {
             try {
                 Thread.currentThread().join();
             } catch (InterruptedException ignore) {
             }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/9feacbb0/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java
new file mode 100644
index 0000000..9287f6e
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/rpc/ResourceManagerFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system.rpc;
+
+import org.apache.oodt.cas.resource.system.ResourceManager;
+import org.apache.oodt.cas.resource.system.ResourceManagerClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.net.URL;
+
+public class ResourceManagerFactory {
+
+    private static final Logger logger = LoggerFactory.getLogger(ResourceManagerFactory.class);
+
+    public static ResourceManager getResourceManager(int port) throws Exception {
+        String resourceManagerClass = System.getProperty("resmgr.manager",
+                "org.apache.oodt.cas.resource.system.AvroRpcResourceManager");
+
+        logger.info("Creating resource manager {} at port: {}", resourceManagerClass, port);
+
+        ResourceManager manager;
+        try {
+            Constructor<?> constructor = Class.forName(resourceManagerClass).getConstructor(Integer.TYPE);
+            manager = (ResourceManager) constructor.newInstance(port);
+        } catch (Exception e) {
+            logger.error("Unable to create resource manager", e);
+            throw e;
+        }
+
+        return manager;
+    }
+
+    public static ResourceManagerClient getResourceManagerClient(URL url) throws Exception {
+        String resMgrClientClass = System.getProperty("resmgr.manager.client",
+                "org.apache.oodt.cas.resource.system.AvroRpcResourceManagerClient");
+
+        logger.info("Creating resource manager client {}", resMgrClientClass);
+
+        ResourceManagerClient client;
+        try {
+            Constructor<?> constructor = Class.forName(resMgrClientClass).getConstructor(URL.class);
+            client = (ResourceManagerClient) constructor.newInstance(url);
+        } catch (Exception e) {
+            logger.error("Unable to create resource manager", e);
+            throw e;
+        }
+
+        return client;
+    }
+}