You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2017/10/15 20:34:14 UTC

[2/7] oodt git commit: avro rpc implemetation

avro rpc implemetation


Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/287d4e89
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/287d4e89
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/287d4e89

Branch: refs/heads/development
Commit: 287d4e8979e9c7d54648688d06374e8d8a2dddc4
Parents: f91720d
Author: Radu Manole <ma...@gmail.com>
Authored: Mon Aug 17 18:11:09 2015 +0300
Committer: Radu Manole <ma...@gmail.com>
Committed: Mon Aug 17 18:11:09 2015 +0300

----------------------------------------------------------------------
 resource/pom.xml                                |  52 +++
 resource/src/main/avro/types/AvroJob.avsc       |  17 +
 resource/src/main/avro/types/AvroJobInput.avsc  |  11 +
 .../main/avro/types/AvroNameValueJobInput.avsc  |  10 +
 .../src/main/avro/types/AvroResourceNode.avsc   |  11 +
 .../avro/types/resource_manager_protocol.avdl   |  53 +++
 .../src/main/avro/types/tatchmgr_protocol.avdl  |  27 ++
 .../cas/resource/batchmgr/AvroRpcBatchMgr.java  | 180 ++++++++
 .../batchmgr/AvroRpcBatchMgrFactory.java        |  32 ++
 .../resource/batchmgr/AvroRpcBatchMgrProxy.java | 135 ++++++
 .../cas/resource/structs/AvroTypeFactory.java   | 168 ++++++++
 .../cas/resource/structs/NameValueJobInput.java |   4 +
 .../resource/system/AvroRpcResourceManager.java | 425 +++++++++++++++++++
 .../system/AvroRpcResourceManagerClient.java    | 305 +++++++++++++
 .../cas/resource/system/ResourceManager.java    |  31 ++
 .../resource/system/ResourceManagerClient.java  |  80 ++++
 .../system/XmlRpcResourceManagerClient.java     |  26 +-
 .../system/extern/AvroRpcBatchStub.java         | 212 +++++++++
 .../cas/resource/batchmgr/TestBatchMgr.java     |  54 +++
 .../resource/structs/TestAvroTypeFactory.java   | 112 +++++
 .../system/TestAvroRpcResourceManager.java      | 159 +++++++
 .../system/TestXmlRpcResourceManager.java       |   4 +
 22 files changed, 2107 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/pom.xml
----------------------------------------------------------------------
diff --git a/resource/pom.xml b/resource/pom.xml
index de807d6..b91fa84 100644
--- a/resource/pom.xml
+++ b/resource/pom.xml
@@ -35,6 +35,48 @@ the License.
   </scm>
   <build>
     <plugins>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.3.2</version>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>1.7.7</version>
+        <configuration>
+          <stringType>String</stringType>
+          <detail>true</detail>
+        </configuration>
+        <executions>
+          <execution>
+            <id>schemas</id>
+            <configuration>
+              <imports>
+                <import>${basedir}/src/main/avro/types/AvroJob.avsc</import>
+                <import>${basedir}/src/main/avro/types/AvroNameValueJobInput.avsc</import>
+                <import>${basedir}/src/main/avro/types/AvroJobInput.avsc</import>
+              </imports>
+            </configuration>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>protocol</id>
+            <configuration><imports>
+              <import>${basedir}/src/main/avro/types</import>
+
+            </imports>
+            </configuration>
+            <goals>
+              <goal>idl-protocol</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
       <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
         <version>2.4</version>
@@ -81,6 +123,16 @@ the License.
   </build>
   <dependencies>
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.7.7</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc</artifactId>
+      <version>1.7.7</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.oodt</groupId>
       <artifactId>cas-metadata</artifactId>
       <version>${project.parent.version}</version>

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroJob.avsc
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/AvroJob.avsc b/resource/src/main/avro/types/AvroJob.avsc
new file mode 100644
index 0000000..7efa842
--- /dev/null
+++ b/resource/src/main/avro/types/AvroJob.avsc
@@ -0,0 +1,17 @@
+{
+  "type":"record",
+  "name":"AvroJob",
+  "default":null,
+  "namespace":"org.apache.oodt.cas.resource.structs.avrotypes",
+  "imports":[],
+  "fields":[
+    {"name":"id","type":"string"},
+    {"name":"name","type":"string"},
+    {"name":"jobInstanceClassName","type":"string"},
+    {"name":"jobInputClassName","type":"string"},
+    {"name":"queueName","type":"string"},
+    {"name":"loadValue","type":"int"},
+    {"name":"status","type":"string"}
+
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroJobInput.avsc
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/AvroJobInput.avsc b/resource/src/main/avro/types/AvroJobInput.avsc
new file mode 100644
index 0000000..d915769
--- /dev/null
+++ b/resource/src/main/avro/types/AvroJobInput.avsc
@@ -0,0 +1,11 @@
+{
+  "type":"record",
+  "name":"AvroJobInput",
+  "default":null,
+  "namespace":"org.apache.oodt.cas.resource.structs.avrotypes",
+  "imports":["AvroNameValueJobInput.avsc"],
+  "fields":[
+    {"name":"className","type":"string"},
+    {"name":"imple","type":["AvroNameValueJobInput","null"]}
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroNameValueJobInput.avsc
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/AvroNameValueJobInput.avsc b/resource/src/main/avro/types/AvroNameValueJobInput.avsc
new file mode 100644
index 0000000..8a1607a
--- /dev/null
+++ b/resource/src/main/avro/types/AvroNameValueJobInput.avsc
@@ -0,0 +1,10 @@
+{
+  "type":"record",
+  "name":"AvroNameValueJobInput",
+  "default":null,
+  "namespace":"org.apache.oodt.cas.resource.structs.avrotypes",
+  "imports":[],
+  "fields":[
+    {"name":"props","type":[{"type":"map","values":"string"},"null"]}
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroResourceNode.avsc
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/AvroResourceNode.avsc b/resource/src/main/avro/types/AvroResourceNode.avsc
new file mode 100644
index 0000000..11509f3
--- /dev/null
+++ b/resource/src/main/avro/types/AvroResourceNode.avsc
@@ -0,0 +1,11 @@
+{
+  "type":"record",
+  "name":"AvroResourceNode",
+  "default":null,
+  "namespace":"org.apache.oodt.cas.resource.structs.avrotypes",
+  "fields":[
+    {"name":"nodeId","type":"string"},
+    {"name":"ipAddr","type":"string"},
+    {"name":"capacity","type":"int","default":0}
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/resource_manager_protocol.avdl
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/resource_manager_protocol.avdl b/resource/src/main/avro/types/resource_manager_protocol.avdl
new file mode 100644
index 0000000..8b3f43f
--- /dev/null
+++ b/resource/src/main/avro/types/resource_manager_protocol.avdl
@@ -0,0 +1,53 @@
+@namespace("org.apache.oodt.cas.resource.structs.avrotypes")
+
+protocol ResourceManager {
+import schema "AvroJob.avsc";
+import schema "AvroNameValueJobInput.avsc";
+import schema "AvroJobInput.avsc";
+import schema "AvroResourceNode.avsc";
+
+    boolean isJobComplete(string jobId);
+
+    AvroJob getJobInfo(string jobId);
+
+    boolean isAlive();
+
+    int getJobQueueSize();
+
+    int getJobQueueCapacity();
+
+    boolean killJob(string jobId);
+
+    string getExecutionNode(string jobId);
+
+    string handleJob(AvroJob exec, AvroJobInput into);
+
+    boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, string hostUrl);
+
+    array<AvroResourceNode> getNodes();
+
+    AvroResourceNode getNodeById(string nodeId);
+
+    boolean addQueue(string queueName);
+
+    boolean removeQueue(string queueName);
+
+    boolean addNode(AvroResourceNode node);
+
+    boolean removeNode(string nodeId);
+
+    boolean setNodeCapacity(string nodeId, int capacity);
+
+    boolean addNodeToQueue(string nodeId, string queueName);
+
+    boolean removeNodeFromQueue(string nodeId, string queueName);
+
+    array<string> getQueues();
+
+    array<string> getNodesInQueue(string queueName);
+
+    array<string> getQueuesWithNode(string nodeId);
+
+    string getNodeLoad(string nodeId);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/tatchmgr_protocol.avdl
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/tatchmgr_protocol.avdl b/resource/src/main/avro/types/tatchmgr_protocol.avdl
new file mode 100644
index 0000000..3e424ff
--- /dev/null
+++ b/resource/src/main/avro/types/tatchmgr_protocol.avdl
@@ -0,0 +1,27 @@
+@namespace("org.apache.oodt.cas.resource.structs.avrotypes")
+
+protocol AvroIntrBatchmgr {
+import schema "AvroJob.avsc";
+import schema "AvroNameValueJobInput.avsc";
+import schema "AvroJobInput.avsc";
+import schema "AvroResourceNode.avsc";
+
+    boolean isAlive();
+
+    boolean executeJob(AvroJob avroJob, AvroJobInput jobInput);
+
+//    public boolean executeJob(Hashtable jobHash, Date jobInput);
+//
+//    public boolean executeJob(Hashtable jobHash, double jobInput);
+//
+//    public boolean executeJob(Hashtable jobHash, int jobInput);
+//
+//    public boolean executeJob(Hashtable jobHash, boolean jobInput);
+//
+//    public boolean executeJob(Hashtable jobHash, Vector jobInput);
+//
+//    public boolean executeJob(Hashtable jobHash, byte[] jobInput);
+
+    boolean killJob(AvroJob jobHash);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java
new file mode 100644
index 0000000..483754f
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.JobStatus;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcBatchMgr implements Batchmgr {
+
+    /* our log stream */
+    private static final Logger LOG = Logger.getLogger(XmlRpcBatchMgr.class
+            .getName());
+
+    private Monitor mon;
+
+    private JobRepository repo;
+
+    private Map nodeToJobMap;
+
+    private Map specToProxyMap;
+
+    public AvroRpcBatchMgr(){
+        nodeToJobMap = new HashMap();
+        specToProxyMap = new HashMap();
+    }
+
+    @Override
+    public boolean executeRemotely(JobSpec jobSpec, ResourceNode resNode) throws JobExecutionException {
+        AvroRpcBatchMgrProxy proxy = new AvroRpcBatchMgrProxy(jobSpec,resNode,this);
+        if (!proxy.nodeAlive()) {
+            throw new JobExecutionException("Node: [" + resNode.getNodeId()
+                    + "] is down: Unable to execute job!");
+        }
+
+        synchronized (this.specToProxyMap) {
+            specToProxyMap.put(jobSpec.getJob().getId(), proxy);
+        }
+
+        synchronized (this.nodeToJobMap) {
+            this.nodeToJobMap
+                    .put(jobSpec.getJob().getId(), resNode.getNodeId());
+        }
+
+        proxy.start();
+
+        return true;
+
+    }
+
+    @Override
+    public void setMonitor(Monitor monitor) {
+        this.mon = monitor;
+    }
+
+    @Override
+    public void setJobRepository(JobRepository repository) {
+        this.repo = repository;
+    }
+
+    @Override
+    public String getExecutionNode(String jobId) {
+        return (String) nodeToJobMap.get(jobId);
+    }
+
+    @Override
+    public boolean killJob(String jobId, ResourceNode node) {
+        JobSpec spec = null;
+        try {
+            spec = repo.getJobById(jobId);
+        } catch (Exception e) {
+            LOG.log(Level.WARNING, "Unable to get job by id: [" + jobId
+                    + "] to kill it: Message: " + e.getMessage());
+            return false;
+        }
+
+        AvroRpcBatchMgrProxy proxy = new AvroRpcBatchMgrProxy(spec, node, this);
+        return proxy.killJob();
+    }
+
+    protected void notifyMonitor(ResourceNode node, JobSpec jobSpec) {
+        Job job = jobSpec.getJob();
+        int reducedLoad = job.getLoadValue().intValue();
+        try {
+            mon.reduceLoad(node, reducedLoad);
+        } catch (MonitorException e) {
+        }
+    }
+
+    protected void jobSuccess(JobSpec spec) {
+        spec.getJob().setStatus(JobStatus.SUCCESS);
+        synchronized (this.nodeToJobMap) {
+            this.nodeToJobMap.remove(spec.getJob().getId());
+        }
+        synchronized (this.specToProxyMap) {
+            XmlRpcBatchMgrProxy proxy = (XmlRpcBatchMgrProxy) this.specToProxyMap
+                    .remove(spec.getJob().getId());
+            if (proxy != null) {
+                proxy = null;
+            }
+        }
+
+        try {
+            repo.updateJob(spec);
+        } catch (JobRepositoryException e) {
+            LOG.log(Level.WARNING, "Error set job completion status for job: ["
+                    + spec.getJob().getId() + "]: Message: " + e.getMessage());
+        }
+    }
+
+    protected void jobFailure(JobSpec spec) {
+        spec.getJob().setStatus(JobStatus.FAILURE);
+        synchronized (this.nodeToJobMap) {
+            this.nodeToJobMap.remove(spec.getJob().getId());
+        }
+        synchronized (this.specToProxyMap) {
+            XmlRpcBatchMgrProxy proxy = (XmlRpcBatchMgrProxy) this.specToProxyMap
+                    .remove(spec.getJob().getId());
+            if (proxy != null) {
+                proxy = null;
+            }
+        }
+
+        try {
+            repo.updateJob(spec);
+        } catch (JobRepositoryException e) {
+            LOG.log(Level.WARNING, "Error set job completion status for job: ["
+                    + spec.getJob().getId() + "]: Message: " + e.getMessage());
+        }
+    }
+
+    protected void jobKilled(JobSpec spec) {
+        spec.getJob().setStatus(JobStatus.KILLED);
+        nodeToJobMap.remove(spec.getJob().getId());
+        try {
+            repo.updateJob(spec);
+        } catch (JobRepositoryException e) {
+            LOG.log(Level.WARNING, "Error setting job killed status for job: ["
+                    + spec.getJob().getId() + "]: Message: " + e.getMessage());
+        }
+    }
+
+    protected void jobExecuting(JobSpec spec) {
+        spec.getJob().setStatus(JobStatus.EXECUTED);
+        try {
+            repo.updateJob(spec);
+        } catch (JobRepositoryException e) {
+            LOG.log(Level.WARNING,
+                    "Error setting job execution status for job: ["
+                            + spec.getJob().getId() + "]: Message: "
+                            + e.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java
new file mode 100644
index 0000000..fe00741
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+import org.apache.oodt.cas.resource.monitor.Monitor;
+
+public class AvroRpcBatchMgrFactory implements BatchmgrFactory {
+
+    private Monitor mon = null;
+
+    public AvroRpcBatchMgrFactory(){}
+
+    public Batchmgr createBatchmgr() {
+        return new AvroRpcBatchMgr();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java
new file mode 100644
index 0000000..98a717a
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.system.extern.AvroRpcBatchStub;
+import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcBatchMgrProxy extends Thread implements Runnable {
+
+    private static final Logger LOG = Logger.getLogger(XmlRpcBatchMgrProxy.class.getName());
+
+    private JobSpec jobSpec;
+
+    private ResourceNode remoteHost;
+
+    private Transceiver client;
+
+    private AvroRpcBatchStub proxy;
+
+    private AvroRpcBatchMgr parent;
+
+    public AvroRpcBatchMgrProxy(JobSpec jobSpec, ResourceNode remoteHost,
+                               AvroRpcBatchMgr par) {
+        this.jobSpec = jobSpec;
+        this.remoteHost = remoteHost;
+        this.parent = par;
+    }
+
+    public boolean nodeAlive() {
+
+        try {
+            this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort()));
+            this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client);
+        } catch (IOException e) {
+            LOG.log(Level.SEVERE, "Failed connection with the server.", e);
+        }
+
+
+
+        boolean alive = false;
+
+        try {
+            alive = proxy.isAlive();
+        } catch (AvroRemoteException e) {
+            alive = false;
+        }
+        return alive;
+
+    }
+
+    public boolean killJob() {
+
+        try {
+            this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort()));
+            this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client);
+        } catch (IOException e) {
+            LOG.log(Level.SEVERE, "Failed connection with the server.", e);
+        }
+
+
+        boolean result = false;
+        try {
+            result = proxy.killJob(AvroTypeFactory.getAvroJob(jobSpec.getJob()));
+        } catch (AvroRemoteException e) {
+            e.printStackTrace();
+            result = false;
+        }
+
+        if (result) {
+            parent.jobKilled(jobSpec);
+        }
+
+        return result;
+    }
+
+    public void run() {
+        try {
+            this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort()));
+            this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client);
+        } catch (IOException e) {
+            LOG.log(Level.SEVERE, "Failed connection with the server.", e);
+        }
+
+        boolean result = false;
+        try {
+            parent.jobExecuting(jobSpec);
+            result = proxy.executeJob(AvroTypeFactory.getAvroJob(jobSpec.getJob()),
+                    AvroTypeFactory.getAvroJobInput(jobSpec.getIn()));
+            if (result)
+                parent.jobSuccess(jobSpec);
+            else
+                throw new Exception("batchstub.executeJob returned false");
+        } catch (Exception e) {
+            LOG.log(Level.SEVERE, "Job execution failed for jobId '" + jobSpec.getJob().getId() + "' : " + e.getMessage(), e);
+            parent.jobFailure(jobSpec);
+        }finally {
+            parent.notifyMonitor(remoteHost, jobSpec);
+        }
+
+    }
+
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java
new file mode 100644
index 0000000..70a2d88
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.structs;
+
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.reflect.AvroName;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.resource.structs.avrotypes.*;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+public class AvroTypeFactory {
+
+    public static Job getJob(AvroJob avroJob) {
+        Job job = new Job();
+        job.setId(avroJob.getId());
+        job.setName(avroJob.getName());
+        job.setJobInstanceClassName(avroJob.getJobInstanceClassName());
+        job.setJobInputClassName(avroJob.getJobInputClassName());
+        job.setQueueName(avroJob.getQueueName());
+        job.setLoadValue(avroJob.getLoadValue());
+        job.setStatus(avroJob.getStatus());
+
+        return job;
+    }
+
+    public static AvroJob getAvroJob(Job job) {
+        AvroJob avroJob = new AvroJob();
+        avroJob.setId(job.getId());
+        avroJob.setName(job.getName());
+        avroJob.setJobInstanceClassName(job.getJobInstanceClassName());
+        avroJob.setJobInputClassName(job.getJobInputClassName());
+        avroJob.setQueueName(job.getQueueName());
+        avroJob.setLoadValue(avroJob.getLoadValue());
+        avroJob.setStatus(avroJob.getStatus());
+
+        return avroJob;
+    }
+
+    //
+
+    public static JobInput getJobInput(AvroJobInput avroJobInput){
+        JobInput jobInput = GenericResourceManagerObjectFactory
+                .getJobInputFromClassName(avroJobInput.getClassName());
+
+        return setJobInputInplementation(jobInput,avroJobInput);
+    }
+
+    public static AvroJobInput getAvroJobInput(JobInput jobInput){
+        AvroJobInput avroJobInput = new AvroJobInput();
+        avroJobInput.setClassName(jobInput.getClass().getCanonicalName());
+
+        return setAvroJobInputInplementation(avroJobInput,jobInput);
+    }
+
+    private static JobInput setJobInputInplementation(JobInput jobInput,AvroJobInput avroJobInput){
+
+        if(jobInput instanceof NameValueJobInput){
+            NameValueJobInput nameValueJobInput = (NameValueJobInput)jobInput;
+            AvroNameValueJobInput avroNameValueJobInput = (AvroNameValueJobInput) avroJobInput.getImple();
+            setPropertiesToNameValueJobInput(getHashtable(avroNameValueJobInput.getProps()), nameValueJobInput);
+            return nameValueJobInput;
+        }
+
+        return jobInput;
+    }
+
+    private static NameValueJobInput setPropertiesToNameValueJobInput(Hashtable hashProp, NameValueJobInput nameValueJobInput){
+        for (Object key : hashProp.keySet()){
+            nameValueJobInput.setNameValuePair((String)key,(String)hashProp.get(key));
+        }
+        return nameValueJobInput;
+    }
+
+
+
+    private static AvroJobInput setAvroJobInputInplementation(AvroJobInput avroJobInput,JobInput jobInput){
+
+        if (jobInput instanceof NameValueJobInput){
+            NameValueJobInput nameValueJobInput = (NameValueJobInput) jobInput;
+
+            AvroNameValueJobInput avroNameValueJobInput = new AvroNameValueJobInput();
+            avroNameValueJobInput.setProps(getMap(nameValueJobInput.getProps()));
+            avroJobInput.setImple(avroNameValueJobInput);
+            return avroJobInput;
+        }
+        return avroJobInput;
+    }
+
+    private static Hashtable getHashtable(Map<String,String> map){
+        Hashtable hashtable = new Hashtable();
+
+        for (String s : map.keySet()){
+            hashtable.put(s,map.get(s));
+        }
+        return hashtable;
+    }
+
+    private static Map<String,String> getMap(Hashtable hashtable){
+        Map<String,String> map = new HashMap<String, String>();
+        for (Object o : hashtable.keySet()){
+            map.put((String)o,(String)hashtable.get(o));
+        }
+        return map;
+    }
+
+    //
+
+    public static ResourceNode getResourceNode(AvroResourceNode avroResourceNode){
+        ResourceNode resourceNode = new ResourceNode();
+        resourceNode.setId(avroResourceNode.getNodeId());
+        try {
+            resourceNode.setIpAddr(new URL(avroResourceNode.getIpAddr()));
+        } catch (MalformedURLException e) {
+            e.printStackTrace();
+        }
+        resourceNode.setCapacity(avroResourceNode.getCapacity());
+        return resourceNode;
+    }
+
+    public static AvroResourceNode getAvroResourceNode(ResourceNode resourceNode){
+        AvroResourceNode avroResourceNode = new AvroResourceNode();
+        avroResourceNode.setNodeId(resourceNode.getNodeId());
+        avroResourceNode.setIpAddr(resourceNode.getIpAddr().toString());
+        avroResourceNode.setCapacity(resourceNode.getCapacity());
+        return avroResourceNode;
+    }
+
+
+    public static List<AvroResourceNode> getListAvroResourceNode(List<ResourceNode> resourceNodes){
+        List<AvroResourceNode> avroResourceNodes = new ArrayList<AvroResourceNode>();
+
+        for (ResourceNode rn : resourceNodes){
+            avroResourceNodes.add(getAvroResourceNode(rn));
+        }
+        return avroResourceNodes;
+    }
+
+    public static List<ResourceNode> getListResourceNode(List<AvroResourceNode> avroResourceNodes){
+        List<ResourceNode> resourceNodes = new ArrayList<ResourceNode>();
+
+        for (AvroResourceNode arn : avroResourceNodes){
+            resourceNodes.add(getResourceNode(arn));
+        }
+
+        return resourceNodes;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
index a7fcb7a..c3cc6fc 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
@@ -114,4 +114,8 @@ public class NameValueJobInput implements JobInput {
     }
   }
 
+  public Properties getProps(){
+    return this.props;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
new file mode 100644
index 0000000..47ea2df
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.oodt.cas.resource.scheduler.Scheduler;
+import org.apache.oodt.cas.resource.structs.*;
+import org.apache.oodt.cas.resource.structs.avrotypes.*;
+import org.apache.oodt.cas.resource.structs.exceptions.*;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
+import org.apache.xmlrpc.WebServer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager{
+
+    private int port = 2000;
+
+    private Logger LOG = Logger
+            .getLogger(XmlRpcResourceManager.class.getName());
+
+    private Server server;
+
+    /* our scheduler */
+    private Scheduler scheduler = null;
+
+    public AvroRpcResourceManager(int port)  throws Exception{
+        // load properties from workflow manager properties file, if specified
+        if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) {
+            String configFile = System
+                    .getProperty("org.apache.oodt.cas.resource.properties");
+            LOG.log(Level.INFO,
+                    "Loading Resource Manager Configuration Properties from: ["
+                            + configFile + "]");
+            System.getProperties().load(
+                    new FileInputStream(new File(configFile)));
+        }
+
+        String schedulerClassStr = System.getProperty(
+                "resource.scheduler.factory",
+                "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory");
+
+        scheduler = GenericResourceManagerObjectFactory
+                .getSchedulerServiceFromFactory(schedulerClassStr);
+
+        // start up the scheduler
+        new Thread(scheduler).start();
+
+        this.port = port;
+
+        // start up the web server
+        server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class,this),
+                new InetSocketAddress(this.port));
+        server.start();
+
+        LOG.log(Level.INFO, "Resource Manager started by "
+                + System.getProperty("user.name", "unknown"));
+
+    }
+
+    @Override
+    public boolean isAlive() throws AvroRemoteException {
+        return true;
+    }
+
+    @Override
+    public int getJobQueueSize() throws AvroRemoteException {
+        try {
+            return this.scheduler.getJobQueue().getSize();
+        }catch (Exception e) {
+            throw new AvroRemoteException(new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e));
+        }
+    }
+
+
+    @Override
+    public int getJobQueueCapacity() throws AvroRemoteException {
+        try {
+            return this.scheduler.getJobQueue().getCapacity();
+        }catch (Exception e) {
+            throw new AvroRemoteException(new JobRepositoryException("Failed to get capacity of JobQueue : " + e.getMessage(), e));
+        }
+    }
+
+    @Override
+    public boolean isJobComplete(String jobId) throws AvroRemoteException {
+        try {
+            JobSpec spec = scheduler.getJobQueue().getJobRepository().getJobById(
+                    jobId);
+            return scheduler.getJobQueue().getJobRepository().jobFinished(spec);
+
+        } catch(JobRepositoryException e ){
+            throw new AvroRemoteException(e);
+        }
+    }
+    @Override
+    public AvroJob getJobInfo(String jobId) throws AvroRemoteException {
+        JobSpec spec = null;
+
+        try {
+            spec = scheduler.getJobQueue().getJobRepository()
+                    .getJobById(jobId);
+        } catch (JobRepositoryException e) {
+            LOG.log(Level.WARNING,
+                    "Exception communicating with job repository for job: ["
+                            + jobId + "]: Message: " + e.getMessage());
+            throw new AvroRemoteException(new JobRepositoryException("Unable to get job: [" + jobId
+                    + "] from repository!"));
+        }
+
+        return AvroTypeFactory.getAvroJob(spec.getJob());
+
+    }
+
+    @Override
+    public String handleJob(AvroJob exec, AvroJobInput into) throws AvroRemoteException {
+        try {
+            return genericHandleJob(exec, into);
+        } catch (SchedulerException e) {
+            throw new AvroRemoteException(e);
+        }
+    }
+
+    @Override
+    public boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, String hostUrl) throws AvroRemoteException {
+        try {
+            return genericHandleJob(exec,in,hostUrl);
+        } catch (JobExecutionException e) {
+            throw new AvroRemoteException(e);
+        }
+    }
+
+    @Override
+    public List<AvroResourceNode> getNodes() throws AvroRemoteException {
+
+        List resNodes = null;
+        try {
+            resNodes = scheduler.getMonitor().getNodes();
+        } catch (MonitorException e) {
+            throw new AvroRemoteException(e);
+        }
+
+        return AvroTypeFactory.getListAvroResourceNode(resNodes);
+    }
+
+    @Override
+    public AvroResourceNode getNodeById(String nodeId) throws AvroRemoteException {
+        ResourceNode node = null;
+        try {
+            node = scheduler.getMonitor().getNodeById(nodeId);
+        } catch (MonitorException e) {
+            throw new AvroRemoteException(e);
+        }
+        return AvroTypeFactory.getAvroResourceNode(node);
+    }
+
+    @Override
+    public boolean killJob(String jobId) throws AvroRemoteException {
+        String resNodeId = scheduler.getBatchmgr().getExecutionNode(jobId);
+        if (resNodeId == null) {
+            LOG.log(Level.WARNING, "Attempt to kill job: [" + jobId
+                    + "]: cannot find execution node"
+                    + " (has the job already finished?)");
+            return false;
+        }
+        ResourceNode node = null;
+        try {
+            node = scheduler.getMonitor().getNodeById(resNodeId);
+        } catch (MonitorException e) {
+            throw new AvroRemoteException(e);
+        }
+        return scheduler.getBatchmgr().killJob(jobId, node);
+
+    }
+
+    @Override
+    public String getExecutionNode(String jobId) throws AvroRemoteException {
+        String execNode = scheduler.getBatchmgr().getExecutionNode(jobId);
+        if (execNode == null) {
+            LOG.log(Level.WARNING, "Job: [" + jobId
+                    + "] not currently executing on any known node");
+            return "";
+        } else
+            return execNode;
+    }
+
+    @Override
+    public List<String> getQueues() throws AvroRemoteException {
+        try {
+            return this.scheduler.getQueueManager().getQueues();
+        } catch (QueueManagerException e) {
+            throw new AvroRemoteException(e);
+        }
+    }
+
+    @Override
+    public boolean addQueue(String queueName) throws AvroRemoteException {
+        try {
+            this.scheduler.getQueueManager().addQueue(queueName);
+        } catch (QueueManagerException e) {
+            e.printStackTrace();
+        }
+        return true;
+
+    }
+
+    @Override
+    public boolean removeQueue(String queueName) throws AvroRemoteException {
+        try {
+            this.scheduler.getQueueManager().removeQueue(queueName);
+        } catch (QueueManagerException e) {
+            throw new AvroRemoteException(e);
+        }
+        return true;
+
+    }
+
+    @Override
+    public boolean addNode(AvroResourceNode node) throws AvroRemoteException {
+        try {
+            this.scheduler.getMonitor().addNode(AvroTypeFactory.getResourceNode(node));
+        } catch (MonitorException e) {
+            throw new AvroRemoteException(e);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean removeNode(String nodeId) throws AvroRemoteException {
+        try{
+            for(String queueName: this.getQueuesWithNode(nodeId)){
+                this.removeNodeFromQueue(nodeId, queueName);
+            }
+            this.scheduler.getMonitor().removeNodeById(nodeId);
+        }catch(Exception e){
+            throw new AvroRemoteException(new MonitorException(e.getMessage(), e));
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean addNodeToQueue(String nodeId, String queueName) throws AvroRemoteException {
+        try {
+            this.scheduler.getQueueManager().addNodeToQueue(nodeId, queueName);
+        } catch (QueueManagerException e) {
+            throw new AvroRemoteException(e);
+        }
+        return true;
+
+    }
+
+    @Override
+    public boolean removeNodeFromQueue(String nodeId, String queueName) throws AvroRemoteException {
+        try {
+            this.scheduler.getQueueManager().removeNodeFromQueue(nodeId, queueName);
+        } catch (QueueManagerException e) {
+            throw new AvroRemoteException(e);
+        }
+        return true;
+
+    }
+
+    @Override
+    public List<String> getNodesInQueue(String queueName) throws AvroRemoteException {
+        try {
+            return this.scheduler.getQueueManager().getNodes(queueName);
+        } catch (QueueManagerException e) {
+            throw new AvroRemoteException(e);
+        }
+    }
+
+    @Override
+    public List<String> getQueuesWithNode(String nodeId) throws AvroRemoteException {
+        try {
+            return this.scheduler.getQueueManager().getQueues(nodeId);
+        } catch (QueueManagerException e) {
+            throw new AvroRemoteException(e);
+        }
+    }
+
+    public boolean shutdown(){
+        if (this.server != null) {
+            this.server.close();
+            this.server = null;
+            return true;
+        } else
+            return false;
+    }
+
+    @Override
+    public String getNodeLoad(String nodeId) throws AvroRemoteException {
+        ResourceNode node = null;
+        try {
+            node = this.scheduler.getMonitor().getNodeById(nodeId);
+            int capacity = node.getCapacity();
+            int load = (this.scheduler.getMonitor().getLoad(node)) * -1 + capacity;
+            return load + "/" + capacity;
+        } catch (MonitorException e) {
+            throw new AvroRemoteException(e);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        int portNum = -1;
+        String usage = "AvroRpcResourceManager --portNum <port number for xml rpc service>\n";
+
+        for (int i = 0; i < args.length; i++) {
+            if (args[i].equals("--portNum")) {
+                portNum = Integer.parseInt(args[++i]);
+            }
+        }
+
+        if (portNum == -1) {
+            System.err.println(usage);
+            System.exit(1);
+        }
+
+        AvroRpcResourceManager manager = new AvroRpcResourceManager(portNum);
+
+        for (;;)
+            try {
+                Thread.currentThread().join();
+            } catch (InterruptedException ignore) {
+            }
+    }
+
+
+    @Override
+    public boolean setNodeCapacity(String nodeId, int capacity) throws AvroRemoteException {
+        try{
+            this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity);
+        }catch (MonitorException e){
+            LOG.log(Level.WARNING, "Exception setting capacity on node "
+                    + nodeId + ": " + e.getMessage());
+            return false;
+        }
+        return true;
+
+    }
+
+
+    private String genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput)
+            throws SchedulerException {
+
+        Job exec = AvroTypeFactory.getJob(avroJob);
+        JobInput in = AvroTypeFactory.getJobInput(avroJobInput);
+        JobSpec spec = new JobSpec(in, exec);
+
+        // queue the job up
+        String jobId = null;
+
+        try {
+            jobId = scheduler.getJobQueue().addJob(spec);
+        } catch (JobQueueException e) {
+            LOG.log(Level.WARNING, "JobQueue exception adding job: Message: "
+                    + e.getMessage());
+            throw new SchedulerException(e.getMessage());
+        }
+        return jobId;
+    }
+
+    private boolean genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput,
+                                     String urlStr) throws JobExecutionException {
+        Job exec = AvroTypeFactory.getJob(avroJob);
+        JobInput in = AvroTypeFactory.getJobInput(avroJobInput);
+
+        JobSpec spec = new JobSpec(in, exec);
+
+        URL remoteUrl = safeGetUrlFromString(urlStr);
+        ResourceNode remoteNode = null;
+
+        try {
+            remoteNode = scheduler.getMonitor().getNodeByURL(remoteUrl);
+        } catch (MonitorException e) {
+        }
+
+        if (remoteNode != null) {
+            return scheduler.getBatchmgr().executeRemotely(spec, remoteNode);
+        } else
+            return false;
+    }
+
+    private URL safeGetUrlFromString(String urlStr) {
+        URL url = null;
+
+        try {
+            url = new URL(urlStr);
+        } catch (MalformedURLException e) {
+            LOG.log(Level.WARNING, "Error converting string: [" + urlStr
+                    + "] to URL object: Message: " + e.getMessage());
+        }
+
+        return url;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
new file mode 100644
index 0000000..fa0e84b
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.oodt.cas.cli.CmdLineUtility;
+import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcResourceManagerClient implements ResourceManagerClient {
+
+    /* our log stream */
+    private static Logger LOG = Logger
+            .getLogger(XmlRpcResourceManagerClient.class.getName());
+
+    /* resource manager url */
+    private URL resMgrUrl = null;
+
+    Transceiver client;
+    ResourceManager proxy;
+
+    public AvroRpcResourceManagerClient(URL url) {
+        // set up the configuration, if there is any
+        if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) {
+            String configFile = System
+                    .getProperty("org.apache.oodt.cas.resource.properties");
+            LOG.log(Level.INFO,
+                    "Loading Resource Manager Configuration Properties from: ["
+                            + configFile + "]");
+            try {
+                System.getProperties().load(
+                        new FileInputStream(new File(configFile)));
+            } catch (Exception e) {
+                LOG.log(Level.INFO,
+                        "Error loading configuration properties from: ["
+                                + configFile + "]");
+            }
+        }
+
+        try {
+            this.client = new NettyTransceiver(new InetSocketAddress(url.getPort()));
+            proxy = (ResourceManager) SpecificRequestor.getClient(ResourceManager.class, client);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    public static void main(String[] args) {
+        CmdLineUtility cmdLineUtility = new CmdLineUtility();
+        cmdLineUtility.run(args);
+    }
+
+
+    @Override
+    public boolean isJobComplete(String jobId) throws JobRepositoryException {
+        try {
+            return proxy.isJobComplete(jobId);
+        } catch (AvroRemoteException e) {
+            throw new JobRepositoryException(e);
+        }
+    }
+
+    @Override
+    public Job getJobInfo(String jobId) throws JobRepositoryException {
+        try {
+            return AvroTypeFactory.getJob(proxy.getJobInfo(jobId));
+        } catch (AvroRemoteException e) {
+            throw new JobRepositoryException(e);
+        }
+    }
+
+    @Override
+    public boolean isAlive() {
+        try {
+            return proxy.isAlive();
+        } catch (AvroRemoteException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    @Override
+    public int getJobQueueSize() throws JobRepositoryException {
+        try {
+            return proxy.getJobQueueSize();
+        } catch (AvroRemoteException e) {
+            throw new JobRepositoryException(e);
+        }
+    }
+
+    @Override
+    public int getJobQueueCapacity() throws JobRepositoryException {
+        try {
+            return proxy.getJobQueueCapacity();
+        } catch (AvroRemoteException e) {
+            throw new JobRepositoryException(e);
+        }
+    }
+
+    @Override
+    public boolean killJob(String jobId) {
+        try {
+            return proxy.killJob(jobId);
+        } catch (AvroRemoteException e) {
+            LOG.log(Level.SEVERE,
+                    "Server error!");
+        }
+        return false;
+    }
+
+    @Override
+    public String getExecutionNode(String jobId) {
+        try {
+            return proxy.getExecutionNode(jobId);
+        } catch (AvroRemoteException e) {
+            LOG.log(Level.SEVERE,
+                    "Server error!");
+        }
+        return null;
+    }
+
+    @Override
+    public String submitJob(Job exec, JobInput in) throws JobExecutionException {
+        try {
+            return proxy.handleJob(AvroTypeFactory.getAvroJob(exec),AvroTypeFactory.getAvroJobInput(in));
+        } catch (AvroRemoteException e) {
+            LOG.log(Level.SEVERE,
+                    "Server error!");
+
+        }
+        return null;
+    }
+
+    @Override
+    public boolean submitJob(Job exec, JobInput in, URL hostUrl) throws JobExecutionException {
+        try {
+            return proxy.handleJobWithUrl(AvroTypeFactory.getAvroJob(exec), AvroTypeFactory.getAvroJobInput(in), hostUrl.toString());
+        } catch (AvroRemoteException e) {
+            throw new JobExecutionException(e);
+        }
+    }
+
+    @Override
+    public List getNodes() throws MonitorException {
+        try {
+            return AvroTypeFactory.getListResourceNode(proxy.getNodes());
+        } catch (AvroRemoteException e) {
+            throw new MonitorException(e);
+        }
+    }
+
+    @Override
+    public ResourceNode getNodeById(String nodeId) throws MonitorException {
+        try {
+            return AvroTypeFactory.getResourceNode(proxy.getNodeById(nodeId));
+        } catch (AvroRemoteException e) {
+            throw new MonitorException(e);
+        }
+    }
+
+    @Override
+    public URL getResMgrUrl() {
+        return this.resMgrUrl;
+    }
+
+    @Override
+    public void setResMgrUrl(URL resMgrUrl) {
+        this.resMgrUrl = resMgrUrl;
+    }
+
+    @Override
+    public void addQueue(String queueName) throws QueueManagerException {
+        try {
+            proxy.addQueue(queueName);
+        } catch (AvroRemoteException e) {
+            throw new QueueManagerException(e);
+        }
+    }
+
+    @Override
+    public void removeQueue(String queueName) throws QueueManagerException {
+        try {
+            proxy.removeQueue(queueName);
+        } catch (AvroRemoteException e) {
+            throw new QueueManagerException(e);
+        }
+
+    }
+
+    @Override
+    public void addNode(ResourceNode node) throws MonitorException {
+        try {
+            proxy.addNode(AvroTypeFactory.getAvroResourceNode(node));
+        } catch (AvroRemoteException e) {
+            throw new MonitorException(e);
+        }
+    }
+
+    @Override
+    public void removeNode(String nodeId) throws MonitorException {
+        try {
+            proxy.removeNode(nodeId);
+        } catch (AvroRemoteException e) {
+            throw new MonitorException(e);
+        }
+    }
+
+    @Override
+    public void setNodeCapacity(String nodeId, int capacity) throws MonitorException {
+        try {
+            proxy.setNodeCapacity(nodeId,capacity);
+        } catch (AvroRemoteException e) {
+            throw new MonitorException(e);
+        }
+    }
+
+    @Override
+    public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
+        try {
+            proxy.addNodeToQueue(nodeId,queueName);
+        } catch (AvroRemoteException e) {
+            throw new QueueManagerException(e);
+        }
+    }
+
+    @Override
+    public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
+        try {
+            proxy.removeNodeFromQueue(nodeId,queueName);
+        } catch (AvroRemoteException e) {
+            throw new QueueManagerException(e);
+        }
+    }
+
+    @Override
+    public List<String> getQueues() throws QueueManagerException {
+        try {
+            return proxy.getQueues();
+        } catch (AvroRemoteException e) {
+            throw new QueueManagerException(e);
+        }
+    }
+
+    @Override
+    public List<String> getNodesInQueue(String queueName) throws QueueManagerException {
+        try {
+            return proxy.getNodesInQueue(queueName);
+        } catch (AvroRemoteException e) {
+            throw new QueueManagerException(e);
+        }
+    }
+
+    @Override
+    public List<String> getQueuesWithNode(String nodeId) throws QueueManagerException {
+        try {
+            return proxy.getQueuesWithNode(nodeId);
+        } catch (AvroRemoteException e) {
+            throw new QueueManagerException(e);
+        }
+    }
+
+    @Override
+    public String getNodeLoad(String nodeId) throws MonitorException {
+        try {
+            return proxy.getNodeLoad(nodeId);
+        } catch (AvroRemoteException e) {
+            throw new MonitorException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
new file mode 100644
index 0000000..5cbf6d3
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.oodt.cas.resource.structs.exceptions.*;
+
+import java.util.Date;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Vector;
+
+public interface ResourceManager {
+
+        boolean shutdown();
+
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
new file mode 100644
index 0000000..dd4444b
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+import java.net.URL;
+import java.util.List;
+
+public interface ResourceManagerClient {
+    boolean isJobComplete(String jobId) throws JobRepositoryException;
+
+    Job getJobInfo(String jobId) throws JobRepositoryException;
+
+    boolean isAlive();
+
+    int getJobQueueSize() throws JobRepositoryException;
+
+    int getJobQueueCapacity() throws JobRepositoryException;
+
+    boolean killJob(String jobId);
+
+    String getExecutionNode(String jobId);
+
+    String submitJob(Job exec, JobInput in) throws JobExecutionException;
+
+    boolean submitJob(Job exec, JobInput in, URL hostUrl)
+            throws JobExecutionException;
+
+    List getNodes() throws MonitorException;
+
+    ResourceNode getNodeById(String nodeId) throws MonitorException;
+
+    URL getResMgrUrl();
+
+    void setResMgrUrl(URL resMgrUrl);
+
+    void addQueue(String queueName) throws QueueManagerException;
+
+    void removeQueue(String queueName) throws QueueManagerException;
+
+    void addNode(ResourceNode node) throws MonitorException;
+
+    void removeNode(String nodeId) throws MonitorException;
+
+    void setNodeCapacity(String nodeId, int capacity) throws MonitorException;
+
+    void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException;
+
+    void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException;
+
+    List<String> getQueues() throws QueueManagerException;
+
+    List<String> getNodesInQueue(String queueName) throws QueueManagerException;
+
+    List<String> getQueuesWithNode(String nodeId) throws QueueManagerException;
+
+    String getNodeLoad(String nodeId) throws MonitorException;
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
index 0fb0520..9110807 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
@@ -56,7 +56,7 @@ import java.io.IOException;
  * </p>
  * 
  */
-public class XmlRpcResourceManagerClient {
+public class XmlRpcResourceManagerClient implements ResourceManagerClient {
 
     /* our xml rpc client */
     private XmlRpcClient client = null;
@@ -119,6 +119,7 @@ public class XmlRpcResourceManagerClient {
        cmdLineUtility.run(args);
     }
 
+    @Override
     public boolean isJobComplete(String jobId) throws JobRepositoryException {
         Vector argList = new Vector();
         argList.add(jobId);
@@ -137,6 +138,7 @@ public class XmlRpcResourceManagerClient {
         return complete;
     }
 
+    @Override
     public Job getJobInfo(String jobId) throws JobRepositoryException {
         Vector argList = new Vector();
         argList.add(jobId);
@@ -155,6 +157,7 @@ public class XmlRpcResourceManagerClient {
         return XmlRpcStructFactory.getJobFromXmlRpc(jobHash);
     }
 
+    @Override
     public boolean isAlive() {
         Vector argList = new Vector();
 
@@ -174,6 +177,7 @@ public class XmlRpcResourceManagerClient {
      * @return Number of Jobs in JobQueue
      * @throws JobRepositoryException On Any Exception
      */
+    @Override
     public int getJobQueueSize() throws JobRepositoryException {
         try {
             Vector argList = new Vector();
@@ -188,6 +192,7 @@ public class XmlRpcResourceManagerClient {
      * @return Max number of Jobs
      * @throws JobRepositoryException On Any Exception
      */
+    @Override
     public int getJobQueueCapacity() throws JobRepositoryException {
         try {
             Vector argList = new Vector();
@@ -197,6 +202,7 @@ public class XmlRpcResourceManagerClient {
         }
     }
     
+    @Override
     public boolean killJob(String jobId) {
         Vector argList = new Vector();
         argList.add(jobId);
@@ -211,6 +217,7 @@ public class XmlRpcResourceManagerClient {
         }
     }
 
+    @Override
     public String getExecutionNode(String jobId) {
         Vector argList = new Vector();
         argList.add(jobId);
@@ -224,6 +231,7 @@ public class XmlRpcResourceManagerClient {
         }
     }
 
+    @Override
     public String submitJob(Job exec, JobInput in) throws JobExecutionException {
         Vector argList = new Vector();
         argList.add(XmlRpcStructFactory.getXmlRpcJob(exec));
@@ -245,6 +253,7 @@ public class XmlRpcResourceManagerClient {
 
     }
 
+    @Override
     public boolean submitJob(Job exec, JobInput in, URL hostUrl)
             throws JobExecutionException {
         Vector argList = new Vector();
@@ -267,6 +276,7 @@ public class XmlRpcResourceManagerClient {
 
     }
 
+    @Override
     public List getNodes() throws MonitorException {
         Vector argList = new Vector();
 
@@ -285,6 +295,7 @@ public class XmlRpcResourceManagerClient {
 
     }
 
+    @Override
     public ResourceNode getNodeById(String nodeId) throws MonitorException {
         Vector argList = new Vector();
         argList.add(nodeId);
@@ -307,6 +318,7 @@ public class XmlRpcResourceManagerClient {
     /**
      * @return the resMgrUrl
      */
+    @Override
     public URL getResMgrUrl() {
         return resMgrUrl;
     }
@@ -315,6 +327,7 @@ public class XmlRpcResourceManagerClient {
      * @param resMgrUrl
      *            the resMgrUrl to set
      */
+    @Override
     public void setResMgrUrl(URL resMgrUrl) {
         this.resMgrUrl = resMgrUrl;
     }
@@ -324,6 +337,7 @@ public class XmlRpcResourceManagerClient {
      * @param queueName The name of the queue to be created
      * @throws QueueManagerException on any error
      */
+    @Override
     public void addQueue(String queueName) throws QueueManagerException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -339,6 +353,7 @@ public class XmlRpcResourceManagerClient {
      * @param queueName The name of the queue to be removed
      * @throws QueueManagerException on any error
      */
+    @Override
     public void removeQueue(String queueName) throws QueueManagerException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -354,6 +369,7 @@ public class XmlRpcResourceManagerClient {
      * @param node The node to be added
      * @throws MonitorException on any error
      */
+    @Override
     public void addNode(ResourceNode node) throws MonitorException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -369,6 +385,7 @@ public class XmlRpcResourceManagerClient {
      * @param nodeId The id of the node to be removed
      * @throws MonitorException on any error
      */
+    @Override
     public void removeNode(String nodeId) throws MonitorException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -379,6 +396,7 @@ public class XmlRpcResourceManagerClient {
         }
     }
     
+    @Override
     public void setNodeCapacity(String nodeId, int capacity) throws MonitorException{
     	try{
     		Vector<Object> argList = new Vector<Object>();
@@ -396,6 +414,7 @@ public class XmlRpcResourceManagerClient {
      * @param queueName The name of the queue to add the given node
      * @throws QueueManagerException on any error
      */
+    @Override
     public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -413,6 +432,7 @@ public class XmlRpcResourceManagerClient {
      * @param queueName The name of the queue from which to remove the given node
      * @throws QueueManagerException on any error
      */
+    @Override
     public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -429,6 +449,7 @@ public class XmlRpcResourceManagerClient {
      * @return A list of currently supported queue names
      * @throws QueueManagerException on any error
      */
+    @Override
     public List<String> getQueues() throws QueueManagerException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -444,6 +465,7 @@ public class XmlRpcResourceManagerClient {
      * @return List of node ids in the given queueName
      * @throws QueueManagerException on any error
      */
+    @Override
     public List<String> getNodesInQueue(String queueName) throws QueueManagerException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -460,6 +482,7 @@ public class XmlRpcResourceManagerClient {
      * @return List of queues which contain the give node
      * @throws QueueManagerException on any error
      */
+    @Override
     public List<String> getQueuesWithNode(String nodeId) throws QueueManagerException {
         try {
             Vector<Object> argList = new Vector<Object>();
@@ -476,6 +499,7 @@ public class XmlRpcResourceManagerClient {
      * @return A String showing a fraction of the loads node over its capacity
      * @throws MonitorException on any error
      */
+    @Override
     public String getNodeLoad(String nodeId) throws MonitorException{
     	try{
 	    	Vector<Object> argList = new Vector<Object>();

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java
new file mode 100644
index 0000000..2d55d19
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system.extern;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroIntrBatchmgr;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroJob;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroJobInput;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
+import org.apache.xmlrpc.WebServer;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcBatchStub implements AvroIntrBatchmgr {
+
+    /* the port to run the XML RPC web server on, default is 2000 */
+    private int port = 2000;
+
+    /* our avro rpc web server */
+    Server server;
+
+    /* our log stream */
+    private static Logger LOG = Logger.getLogger(AvroRpcBatchStub.class
+            .getName());
+
+    private static Map jobThreadMap = null;
+
+    public AvroRpcBatchStub(int port) throws Exception {
+
+
+        this.port = port;
+
+        // start up the web server
+        server = new NettyServer(new SpecificResponder(AvroIntrBatchmgr.class,this), new InetSocketAddress(this.port));
+        server.start();
+
+        jobThreadMap = new HashMap();
+
+        LOG.log(Level.INFO, "AvroRpc Batch Stub started by "
+                + System.getProperty("user.name", "unknown"));
+    }
+
+    @Override
+    public boolean isAlive() throws AvroRemoteException {
+        return true;
+    }
+
+    @Override
+    public boolean executeJob(AvroJob avroJob, AvroJobInput jobInput) throws AvroRemoteException {
+        try {
+            return genericExecuteJob(avroJob,jobInput);
+        } catch (JobException e) {
+            throw new AvroRemoteException(e);
+        }
+    }
+
+    @Override
+    public boolean killJob(AvroJob jobHash) throws AvroRemoteException {
+        Job job = AvroTypeFactory.getJob(jobHash);
+        Thread jobThread = (Thread) jobThreadMap.get(job.getId());
+        if (jobThread == null) {
+            LOG.log(Level.WARNING, "Job: [" + job.getId()
+                    + "] not managed by this batch stub");
+            return false;
+        }
+
+        // okay, so interrupt it, which should cause it to stop
+        jobThread.interrupt();
+        return true;
+    }
+
+    private boolean genericExecuteJob(AvroJob avroJob, AvroJobInput jobInput)
+            throws JobException {
+        JobInstance exec = null;
+        JobInput in = null;
+        try {
+            Job job = AvroTypeFactory.getJob(avroJob);
+
+            LOG.log(Level.INFO, "stub attempting to execute class: ["
+                    + job.getJobInstanceClassName() + "]");
+
+            exec = GenericResourceManagerObjectFactory
+                    .getJobInstanceFromClassName(job.getJobInstanceClassName());
+            in = AvroTypeFactory.getJobInput(jobInput);
+            // load the input obj
+            //
+
+            // create threaded job
+            // so that it can be interrupted
+            RunnableJob runner = new RunnableJob(exec, in);
+            Thread threadRunner = new Thread(runner);
+            /* save this job thread in a map so we can kill it later */
+            jobThreadMap.put(job.getId(), threadRunner);
+            threadRunner.start();
+
+            try {
+                threadRunner.join();
+            } catch (InterruptedException e) {
+                LOG.log(Level.INFO, "Current job: [" + job.getName()
+                        + "]: killed: exiting gracefully");
+                synchronized (jobThreadMap) {
+                    Thread endThread = (Thread) jobThreadMap.get(job.getId());
+                    if (endThread != null)
+                        endThread = null;
+                }
+                return false;
+            }
+
+            synchronized (jobThreadMap) {
+                Thread endThread = (Thread) jobThreadMap.get(job.getId());
+                if (endThread != null)
+                    endThread = null;
+            }
+
+            return runner.wasSuccessful();
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    private class RunnableJob implements Runnable {
+
+        private JobInput in;
+
+        private JobInstance job;
+
+        private boolean successful;
+
+        public RunnableJob(JobInstance job, JobInput in) {
+            this.job = job;
+            this.in = in;
+            this.successful = false;
+        }
+
+        /*
+         * (non-Javadoc)
+         *
+         * @see java.lang.Runnable#run()
+         */
+        public void run() {
+            try {
+                this.successful = job.execute(in);
+            } catch (JobInputException e) {
+                e.printStackTrace();
+                this.successful = false;
+            }
+
+        }
+
+        public boolean wasSuccessful() {
+            return this.successful;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        int portNum = -1;
+        String usage = "AvroRpcBatchStub --portNum <port number for xml rpc service>\n";
+
+        for (int i = 0; i < args.length; i++) {
+            if (args[i].equals("--portNum")) {
+                portNum = Integer.parseInt(args[++i]);
+            }
+        }
+
+        if (portNum == -1) {
+            System.err.println(usage);
+            System.exit(1);
+        }
+
+        XmlRpcBatchStub stub = new XmlRpcBatchStub(portNum);
+
+        for (;;)
+            try {
+                Thread.currentThread().join();
+            } catch (InterruptedException ignore) {
+            }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java b/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java
new file mode 100644
index 0000000..0fa28ef
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+import junit.framework.TestCase;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.system.extern.AvroRpcBatchStub;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class TestBatchMgr extends TestCase {
+
+    public void testAvroBatchMgr(){
+        AvroRpcBatchMgrFactory avroRpcBatchMgrFactory = new AvroRpcBatchMgrFactory();
+        Batchmgr batchmgr = avroRpcBatchMgrFactory.createBatchmgr();
+        assertNotNull(batchmgr);
+
+        try {
+            AvroRpcBatchStub avroRpcBatchStub = new AvroRpcBatchStub(50001);
+        } catch (Exception e) {
+
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+        ResourceNode resNode = new ResourceNode();
+        try {
+            resNode.setIpAddr(new URL("http//:localhost:50001"));
+        } catch (MalformedURLException e) {
+            fail(e.getMessage());
+        }
+
+        AvroRpcBatchMgrProxy bmc = new AvroRpcBatchMgrProxy(new JobSpec(), new ResourceNode(),(AvroRpcBatchMgr)batchmgr);
+
+        assertTrue(bmc.nodeAlive());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java b/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java
new file mode 100644
index 0000000..1ad4e18
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.structs;
+
+import junit.framework.TestCase;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Properties;
+
+public class TestAvroTypeFactory extends TestCase {
+
+    public void testAvroJobInput(){
+        JobInput jobInput = GenericResourceManagerObjectFactory
+                .getJobInputFromClassName("org.apache.oodt.cas.resource.structs.NameValueJobInput");
+
+        assertNotNull(jobInput);
+        Properties properties = new Properties();
+        properties.setProperty("key","prop1");
+        jobInput.configure(properties);
+
+        JobInput afterJobInput = AvroTypeFactory.getJobInput(AvroTypeFactory.getAvroJobInput(jobInput));
+
+        assertNotNull(afterJobInput);
+        assertEquals(afterJobInput.getId(),jobInput.getId());
+
+
+    }
+
+    public void testAvroJob(){
+        Job initJob = new Job();
+
+        initJob.setId("id");
+        initJob.setJobInputClassName("classname");
+        initJob.setJobInstanceClassName("instClassName");
+        initJob.setLoadValue(42);
+        initJob.setQueueName("queueName");
+        initJob.setStatus("status");
+        initJob.setName("name");
+
+        Job afterJob = AvroTypeFactory.getJob(AvroTypeFactory.getAvroJob(initJob));
+
+
+
+        assertEquals("id",afterJob.getId());
+
+        assertEquals("classname",afterJob.getJobInputClassName());
+
+        assertEquals("instClassName",afterJob.getJobInstanceClassName());
+
+        assertEquals(new Integer(42),afterJob.getLoadValue());
+
+        assertEquals("name",afterJob.getName());
+
+        assertEquals("queueName",afterJob.getQueueName());
+
+        assertEquals("status",afterJob.getStatus());
+    }
+
+    public void testNameValueJobInput(){
+        NameValueJobInput initNameValueJobInput = new NameValueJobInput();
+
+        initNameValueJobInput.setNameValuePair("name","value");
+
+        NameValueJobInput afterNameValueJobInput =(NameValueJobInput) AvroTypeFactory.getJobInput(
+                AvroTypeFactory.getAvroJobInput(
+                        initNameValueJobInput));
+
+        assertEquals(initNameValueJobInput.getId(),afterNameValueJobInput.getId());
+        assertEquals("value", afterNameValueJobInput.getProps().getProperty("name"));
+
+    }
+
+    public void testAvroResourceNode(){
+        ResourceNode initResourceNode = new ResourceNode();
+
+        initResourceNode.setCapacity(42);
+
+        initResourceNode.setId("id");
+
+        try {
+            initResourceNode.setIpAddr(new URL("http//:localhost"));
+        } catch (MalformedURLException e) {
+            fail(e.getMessage());
+        }
+
+        ResourceNode afterResourceNode = AvroTypeFactory.getResourceNode(AvroTypeFactory.getAvroResourceNode(initResourceNode));
+
+        assertEquals(initResourceNode.getCapacity(),afterResourceNode.getCapacity());
+
+        assertEquals(initResourceNode.getIpAddr(),afterResourceNode.getIpAddr());
+
+        assertEquals(initResourceNode.getNodeId(),afterResourceNode.getNodeId());
+
+    }
+}