You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2017/10/15 20:34:14 UTC
[2/7] oodt git commit: avro rpc implemetation
avro rpc implemetation
Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/287d4e89
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/287d4e89
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/287d4e89
Branch: refs/heads/development
Commit: 287d4e8979e9c7d54648688d06374e8d8a2dddc4
Parents: f91720d
Author: Radu Manole <ma...@gmail.com>
Authored: Mon Aug 17 18:11:09 2015 +0300
Committer: Radu Manole <ma...@gmail.com>
Committed: Mon Aug 17 18:11:09 2015 +0300
----------------------------------------------------------------------
resource/pom.xml | 52 +++
resource/src/main/avro/types/AvroJob.avsc | 17 +
resource/src/main/avro/types/AvroJobInput.avsc | 11 +
.../main/avro/types/AvroNameValueJobInput.avsc | 10 +
.../src/main/avro/types/AvroResourceNode.avsc | 11 +
.../avro/types/resource_manager_protocol.avdl | 53 +++
.../src/main/avro/types/tatchmgr_protocol.avdl | 27 ++
.../cas/resource/batchmgr/AvroRpcBatchMgr.java | 180 ++++++++
.../batchmgr/AvroRpcBatchMgrFactory.java | 32 ++
.../resource/batchmgr/AvroRpcBatchMgrProxy.java | 135 ++++++
.../cas/resource/structs/AvroTypeFactory.java | 168 ++++++++
.../cas/resource/structs/NameValueJobInput.java | 4 +
.../resource/system/AvroRpcResourceManager.java | 425 +++++++++++++++++++
.../system/AvroRpcResourceManagerClient.java | 305 +++++++++++++
.../cas/resource/system/ResourceManager.java | 31 ++
.../resource/system/ResourceManagerClient.java | 80 ++++
.../system/XmlRpcResourceManagerClient.java | 26 +-
.../system/extern/AvroRpcBatchStub.java | 212 +++++++++
.../cas/resource/batchmgr/TestBatchMgr.java | 54 +++
.../resource/structs/TestAvroTypeFactory.java | 112 +++++
.../system/TestAvroRpcResourceManager.java | 159 +++++++
.../system/TestXmlRpcResourceManager.java | 4 +
22 files changed, 2107 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/pom.xml
----------------------------------------------------------------------
diff --git a/resource/pom.xml b/resource/pom.xml
index de807d6..b91fa84 100644
--- a/resource/pom.xml
+++ b/resource/pom.xml
@@ -35,6 +35,48 @@ the License.
</scm>
<build>
<plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.7.7</version>
+ <configuration>
+ <stringType>String</stringType>
+ <detail>true</detail>
+ </configuration>
+ <executions>
+ <execution>
+ <id>schemas</id>
+ <configuration>
+ <imports>
+ <import>${basedir}/src/main/avro/types/AvroJob.avsc</import>
+ <import>${basedir}/src/main/avro/types/AvroNameValueJobInput.avsc</import>
+ <import>${basedir}/src/main/avro/types/AvroJobInput.avsc</import>
+ </imports>
+ </configuration>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>protocol</id>
+ <configuration><imports>
+ <import>${basedir}/src/main/avro/types</import>
+
+ </imports>
+ </configuration>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4</version>
@@ -81,6 +123,16 @@ the License.
</build>
<dependencies>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.oodt</groupId>
<artifactId>cas-metadata</artifactId>
<version>${project.parent.version}</version>
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroJob.avsc
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/AvroJob.avsc b/resource/src/main/avro/types/AvroJob.avsc
new file mode 100644
index 0000000..7efa842
--- /dev/null
+++ b/resource/src/main/avro/types/AvroJob.avsc
@@ -0,0 +1,17 @@
+{
+ "type":"record",
+ "name":"AvroJob",
+ "default":null,
+ "namespace":"org.apache.oodt.cas.resource.structs.avrotypes",
+ "imports":[],
+ "fields":[
+ {"name":"id","type":"string"},
+ {"name":"name","type":"string"},
+ {"name":"jobInstanceClassName","type":"string"},
+ {"name":"jobInputClassName","type":"string"},
+ {"name":"queueName","type":"string"},
+ {"name":"loadValue","type":"int"},
+ {"name":"status","type":"string"}
+
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroJobInput.avsc
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/AvroJobInput.avsc b/resource/src/main/avro/types/AvroJobInput.avsc
new file mode 100644
index 0000000..d915769
--- /dev/null
+++ b/resource/src/main/avro/types/AvroJobInput.avsc
@@ -0,0 +1,11 @@
+{
+ "type":"record",
+ "name":"AvroJobInput",
+ "default":null,
+ "namespace":"org.apache.oodt.cas.resource.structs.avrotypes",
+ "imports":["AvroNameValueJobInput.avsc"],
+ "fields":[
+ {"name":"className","type":"string"},
+ {"name":"imple","type":["AvroNameValueJobInput","null"]}
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroNameValueJobInput.avsc
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/AvroNameValueJobInput.avsc b/resource/src/main/avro/types/AvroNameValueJobInput.avsc
new file mode 100644
index 0000000..8a1607a
--- /dev/null
+++ b/resource/src/main/avro/types/AvroNameValueJobInput.avsc
@@ -0,0 +1,10 @@
+{
+ "type":"record",
+ "name":"AvroNameValueJobInput",
+ "default":null,
+ "namespace":"org.apache.oodt.cas.resource.structs.avrotypes",
+ "imports":[],
+ "fields":[
+ {"name":"props","type":[{"type":"map","values":"string"},"null"]}
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroResourceNode.avsc
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/AvroResourceNode.avsc b/resource/src/main/avro/types/AvroResourceNode.avsc
new file mode 100644
index 0000000..11509f3
--- /dev/null
+++ b/resource/src/main/avro/types/AvroResourceNode.avsc
@@ -0,0 +1,11 @@
+{
+ "type":"record",
+ "name":"AvroResourceNode",
+ "default":null,
+ "namespace":"org.apache.oodt.cas.resource.structs.avrotypes",
+ "fields":[
+ {"name":"nodeId","type":"string"},
+ {"name":"ipAddr","type":"string"},
+ {"name":"capacity","type":"int","default":0}
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/resource_manager_protocol.avdl
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/resource_manager_protocol.avdl b/resource/src/main/avro/types/resource_manager_protocol.avdl
new file mode 100644
index 0000000..8b3f43f
--- /dev/null
+++ b/resource/src/main/avro/types/resource_manager_protocol.avdl
@@ -0,0 +1,53 @@
+@namespace("org.apache.oodt.cas.resource.structs.avrotypes")
+
+protocol ResourceManager {
+import schema "AvroJob.avsc";
+import schema "AvroNameValueJobInput.avsc";
+import schema "AvroJobInput.avsc";
+import schema "AvroResourceNode.avsc";
+
+ boolean isJobComplete(string jobId);
+
+ AvroJob getJobInfo(string jobId);
+
+ boolean isAlive();
+
+ int getJobQueueSize();
+
+ int getJobQueueCapacity();
+
+ boolean killJob(string jobId);
+
+ string getExecutionNode(string jobId);
+
+ string handleJob(AvroJob exec, AvroJobInput into);
+
+ boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, string hostUrl);
+
+ array<AvroResourceNode> getNodes();
+
+ AvroResourceNode getNodeById(string nodeId);
+
+ boolean addQueue(string queueName);
+
+ boolean removeQueue(string queueName);
+
+ boolean addNode(AvroResourceNode node);
+
+ boolean removeNode(string nodeId);
+
+ boolean setNodeCapacity(string nodeId, int capacity);
+
+ boolean addNodeToQueue(string nodeId, string queueName);
+
+ boolean removeNodeFromQueue(string nodeId, string queueName);
+
+ array<string> getQueues();
+
+ array<string> getNodesInQueue(string queueName);
+
+ array<string> getQueuesWithNode(string nodeId);
+
+ string getNodeLoad(string nodeId);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/tatchmgr_protocol.avdl
----------------------------------------------------------------------
diff --git a/resource/src/main/avro/types/tatchmgr_protocol.avdl b/resource/src/main/avro/types/tatchmgr_protocol.avdl
new file mode 100644
index 0000000..3e424ff
--- /dev/null
+++ b/resource/src/main/avro/types/tatchmgr_protocol.avdl
@@ -0,0 +1,27 @@
+@namespace("org.apache.oodt.cas.resource.structs.avrotypes")
+
+protocol AvroIntrBatchmgr {
+import schema "AvroJob.avsc";
+import schema "AvroNameValueJobInput.avsc";
+import schema "AvroJobInput.avsc";
+import schema "AvroResourceNode.avsc";
+
+ boolean isAlive();
+
+ boolean executeJob(AvroJob avroJob, AvroJobInput jobInput);
+
+// public boolean executeJob(Hashtable jobHash, Date jobInput);
+//
+// public boolean executeJob(Hashtable jobHash, double jobInput);
+//
+// public boolean executeJob(Hashtable jobHash, int jobInput);
+//
+// public boolean executeJob(Hashtable jobHash, boolean jobInput);
+//
+// public boolean executeJob(Hashtable jobHash, Vector jobInput);
+//
+// public boolean executeJob(Hashtable jobHash, byte[] jobInput);
+
+ boolean killJob(AvroJob jobHash);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java
new file mode 100644
index 0000000..483754f
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.JobStatus;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcBatchMgr implements Batchmgr {
+
+ /* our log stream */
+ private static final Logger LOG = Logger.getLogger(XmlRpcBatchMgr.class
+ .getName());
+
+ private Monitor mon;
+
+ private JobRepository repo;
+
+ private Map nodeToJobMap;
+
+ private Map specToProxyMap;
+
+ public AvroRpcBatchMgr(){
+ nodeToJobMap = new HashMap();
+ specToProxyMap = new HashMap();
+ }
+
+ @Override
+ public boolean executeRemotely(JobSpec jobSpec, ResourceNode resNode) throws JobExecutionException {
+ AvroRpcBatchMgrProxy proxy = new AvroRpcBatchMgrProxy(jobSpec,resNode,this);
+ if (!proxy.nodeAlive()) {
+ throw new JobExecutionException("Node: [" + resNode.getNodeId()
+ + "] is down: Unable to execute job!");
+ }
+
+ synchronized (this.specToProxyMap) {
+ specToProxyMap.put(jobSpec.getJob().getId(), proxy);
+ }
+
+ synchronized (this.nodeToJobMap) {
+ this.nodeToJobMap
+ .put(jobSpec.getJob().getId(), resNode.getNodeId());
+ }
+
+ proxy.start();
+
+ return true;
+
+ }
+
+ @Override
+ public void setMonitor(Monitor monitor) {
+ this.mon = monitor;
+ }
+
+ @Override
+ public void setJobRepository(JobRepository repository) {
+ this.repo = repository;
+ }
+
+ @Override
+ public String getExecutionNode(String jobId) {
+ return (String) nodeToJobMap.get(jobId);
+ }
+
+ @Override
+ public boolean killJob(String jobId, ResourceNode node) {
+ JobSpec spec = null;
+ try {
+ spec = repo.getJobById(jobId);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Unable to get job by id: [" + jobId
+ + "] to kill it: Message: " + e.getMessage());
+ return false;
+ }
+
+ AvroRpcBatchMgrProxy proxy = new AvroRpcBatchMgrProxy(spec, node, this);
+ return proxy.killJob();
+ }
+
+ protected void notifyMonitor(ResourceNode node, JobSpec jobSpec) {
+ Job job = jobSpec.getJob();
+ int reducedLoad = job.getLoadValue().intValue();
+ try {
+ mon.reduceLoad(node, reducedLoad);
+ } catch (MonitorException e) {
+ }
+ }
+
+ protected void jobSuccess(JobSpec spec) {
+ spec.getJob().setStatus(JobStatus.SUCCESS);
+ synchronized (this.nodeToJobMap) {
+ this.nodeToJobMap.remove(spec.getJob().getId());
+ }
+ synchronized (this.specToProxyMap) {
+ XmlRpcBatchMgrProxy proxy = (XmlRpcBatchMgrProxy) this.specToProxyMap
+ .remove(spec.getJob().getId());
+ if (proxy != null) {
+ proxy = null;
+ }
+ }
+
+ try {
+ repo.updateJob(spec);
+ } catch (JobRepositoryException e) {
+ LOG.log(Level.WARNING, "Error set job completion status for job: ["
+ + spec.getJob().getId() + "]: Message: " + e.getMessage());
+ }
+ }
+
+ protected void jobFailure(JobSpec spec) {
+ spec.getJob().setStatus(JobStatus.FAILURE);
+ synchronized (this.nodeToJobMap) {
+ this.nodeToJobMap.remove(spec.getJob().getId());
+ }
+ synchronized (this.specToProxyMap) {
+ XmlRpcBatchMgrProxy proxy = (XmlRpcBatchMgrProxy) this.specToProxyMap
+ .remove(spec.getJob().getId());
+ if (proxy != null) {
+ proxy = null;
+ }
+ }
+
+ try {
+ repo.updateJob(spec);
+ } catch (JobRepositoryException e) {
+ LOG.log(Level.WARNING, "Error set job completion status for job: ["
+ + spec.getJob().getId() + "]: Message: " + e.getMessage());
+ }
+ }
+
+ protected void jobKilled(JobSpec spec) {
+ spec.getJob().setStatus(JobStatus.KILLED);
+ nodeToJobMap.remove(spec.getJob().getId());
+ try {
+ repo.updateJob(spec);
+ } catch (JobRepositoryException e) {
+ LOG.log(Level.WARNING, "Error setting job killed status for job: ["
+ + spec.getJob().getId() + "]: Message: " + e.getMessage());
+ }
+ }
+
+ protected void jobExecuting(JobSpec spec) {
+ spec.getJob().setStatus(JobStatus.EXECUTED);
+ try {
+ repo.updateJob(spec);
+ } catch (JobRepositoryException e) {
+ LOG.log(Level.WARNING,
+ "Error setting job execution status for job: ["
+ + spec.getJob().getId() + "]: Message: "
+ + e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java
new file mode 100644
index 0000000..fe00741
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+import org.apache.oodt.cas.resource.monitor.Monitor;
+
+public class AvroRpcBatchMgrFactory implements BatchmgrFactory {
+
+ private Monitor mon = null;
+
+ public AvroRpcBatchMgrFactory(){}
+
+ public Batchmgr createBatchmgr() {
+ return new AvroRpcBatchMgr();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java
new file mode 100644
index 0000000..98a717a
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.system.extern.AvroRpcBatchStub;
+import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcBatchMgrProxy extends Thread implements Runnable {
+
+ private static final Logger LOG = Logger.getLogger(XmlRpcBatchMgrProxy.class.getName());
+
+ private JobSpec jobSpec;
+
+ private ResourceNode remoteHost;
+
+ private Transceiver client;
+
+ private AvroRpcBatchStub proxy;
+
+ private AvroRpcBatchMgr parent;
+
+ public AvroRpcBatchMgrProxy(JobSpec jobSpec, ResourceNode remoteHost,
+ AvroRpcBatchMgr par) {
+ this.jobSpec = jobSpec;
+ this.remoteHost = remoteHost;
+ this.parent = par;
+ }
+
+ public boolean nodeAlive() {
+
+ try {
+ this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort()));
+ this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client);
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Failed connection with the server.", e);
+ }
+
+
+
+ boolean alive = false;
+
+ try {
+ alive = proxy.isAlive();
+ } catch (AvroRemoteException e) {
+ alive = false;
+ }
+ return alive;
+
+ }
+
+ public boolean killJob() {
+
+ try {
+ this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort()));
+ this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client);
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Failed connection with the server.", e);
+ }
+
+
+ boolean result = false;
+ try {
+ result = proxy.killJob(AvroTypeFactory.getAvroJob(jobSpec.getJob()));
+ } catch (AvroRemoteException e) {
+ e.printStackTrace();
+ result = false;
+ }
+
+ if (result) {
+ parent.jobKilled(jobSpec);
+ }
+
+ return result;
+ }
+
+ public void run() {
+ try {
+ this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort()));
+ this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client);
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Failed connection with the server.", e);
+ }
+
+ boolean result = false;
+ try {
+ parent.jobExecuting(jobSpec);
+ result = proxy.executeJob(AvroTypeFactory.getAvroJob(jobSpec.getJob()),
+ AvroTypeFactory.getAvroJobInput(jobSpec.getIn()));
+ if (result)
+ parent.jobSuccess(jobSpec);
+ else
+ throw new Exception("batchstub.executeJob returned false");
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "Job execution failed for jobId '" + jobSpec.getJob().getId() + "' : " + e.getMessage(), e);
+ parent.jobFailure(jobSpec);
+ }finally {
+ parent.notifyMonitor(remoteHost, jobSpec);
+ }
+
+ }
+
+
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java
new file mode 100644
index 0000000..70a2d88
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.structs;
+
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.reflect.AvroName;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.resource.structs.avrotypes.*;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+public class AvroTypeFactory {
+
+ public static Job getJob(AvroJob avroJob) {
+ Job job = new Job();
+ job.setId(avroJob.getId());
+ job.setName(avroJob.getName());
+ job.setJobInstanceClassName(avroJob.getJobInstanceClassName());
+ job.setJobInputClassName(avroJob.getJobInputClassName());
+ job.setQueueName(avroJob.getQueueName());
+ job.setLoadValue(avroJob.getLoadValue());
+ job.setStatus(avroJob.getStatus());
+
+ return job;
+ }
+
+ public static AvroJob getAvroJob(Job job) {
+ AvroJob avroJob = new AvroJob();
+ avroJob.setId(job.getId());
+ avroJob.setName(job.getName());
+ avroJob.setJobInstanceClassName(job.getJobInstanceClassName());
+ avroJob.setJobInputClassName(job.getJobInputClassName());
+ avroJob.setQueueName(job.getQueueName());
+ avroJob.setLoadValue(avroJob.getLoadValue());
+ avroJob.setStatus(avroJob.getStatus());
+
+ return avroJob;
+ }
+
+ //
+
+ public static JobInput getJobInput(AvroJobInput avroJobInput){
+ JobInput jobInput = GenericResourceManagerObjectFactory
+ .getJobInputFromClassName(avroJobInput.getClassName());
+
+ return setJobInputInplementation(jobInput,avroJobInput);
+ }
+
+ public static AvroJobInput getAvroJobInput(JobInput jobInput){
+ AvroJobInput avroJobInput = new AvroJobInput();
+ avroJobInput.setClassName(jobInput.getClass().getCanonicalName());
+
+ return setAvroJobInputInplementation(avroJobInput,jobInput);
+ }
+
+ private static JobInput setJobInputInplementation(JobInput jobInput,AvroJobInput avroJobInput){
+
+ if(jobInput instanceof NameValueJobInput){
+ NameValueJobInput nameValueJobInput = (NameValueJobInput)jobInput;
+ AvroNameValueJobInput avroNameValueJobInput = (AvroNameValueJobInput) avroJobInput.getImple();
+ setPropertiesToNameValueJobInput(getHashtable(avroNameValueJobInput.getProps()), nameValueJobInput);
+ return nameValueJobInput;
+ }
+
+ return jobInput;
+ }
+
+ private static NameValueJobInput setPropertiesToNameValueJobInput(Hashtable hashProp, NameValueJobInput nameValueJobInput){
+ for (Object key : hashProp.keySet()){
+ nameValueJobInput.setNameValuePair((String)key,(String)hashProp.get(key));
+ }
+ return nameValueJobInput;
+ }
+
+
+
+ private static AvroJobInput setAvroJobInputInplementation(AvroJobInput avroJobInput,JobInput jobInput){
+
+ if (jobInput instanceof NameValueJobInput){
+ NameValueJobInput nameValueJobInput = (NameValueJobInput) jobInput;
+
+ AvroNameValueJobInput avroNameValueJobInput = new AvroNameValueJobInput();
+ avroNameValueJobInput.setProps(getMap(nameValueJobInput.getProps()));
+ avroJobInput.setImple(avroNameValueJobInput);
+ return avroJobInput;
+ }
+ return avroJobInput;
+ }
+
+ private static Hashtable getHashtable(Map<String,String> map){
+ Hashtable hashtable = new Hashtable();
+
+ for (String s : map.keySet()){
+ hashtable.put(s,map.get(s));
+ }
+ return hashtable;
+ }
+
+ private static Map<String,String> getMap(Hashtable hashtable){
+ Map<String,String> map = new HashMap<String, String>();
+ for (Object o : hashtable.keySet()){
+ map.put((String)o,(String)hashtable.get(o));
+ }
+ return map;
+ }
+
+ //
+
+ public static ResourceNode getResourceNode(AvroResourceNode avroResourceNode){
+ ResourceNode resourceNode = new ResourceNode();
+ resourceNode.setId(avroResourceNode.getNodeId());
+ try {
+ resourceNode.setIpAddr(new URL(avroResourceNode.getIpAddr()));
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ resourceNode.setCapacity(avroResourceNode.getCapacity());
+ return resourceNode;
+ }
+
+ public static AvroResourceNode getAvroResourceNode(ResourceNode resourceNode){
+ AvroResourceNode avroResourceNode = new AvroResourceNode();
+ avroResourceNode.setNodeId(resourceNode.getNodeId());
+ avroResourceNode.setIpAddr(resourceNode.getIpAddr().toString());
+ avroResourceNode.setCapacity(resourceNode.getCapacity());
+ return avroResourceNode;
+ }
+
+
+ public static List<AvroResourceNode> getListAvroResourceNode(List<ResourceNode> resourceNodes){
+ List<AvroResourceNode> avroResourceNodes = new ArrayList<AvroResourceNode>();
+
+ for (ResourceNode rn : resourceNodes){
+ avroResourceNodes.add(getAvroResourceNode(rn));
+ }
+ return avroResourceNodes;
+ }
+
+ public static List<ResourceNode> getListResourceNode(List<AvroResourceNode> avroResourceNodes){
+ List<ResourceNode> resourceNodes = new ArrayList<ResourceNode>();
+
+ for (AvroResourceNode arn : avroResourceNodes){
+ resourceNodes.add(getResourceNode(arn));
+ }
+
+ return resourceNodes;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
index a7fcb7a..c3cc6fc 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
@@ -114,4 +114,8 @@ public class NameValueJobInput implements JobInput {
}
}
+ public Properties getProps(){
+ return this.props;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
new file mode 100644
index 0000000..47ea2df
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.oodt.cas.resource.scheduler.Scheduler;
+import org.apache.oodt.cas.resource.structs.*;
+import org.apache.oodt.cas.resource.structs.avrotypes.*;
+import org.apache.oodt.cas.resource.structs.exceptions.*;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
+import org.apache.xmlrpc.WebServer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager{
+
+ private int port = 2000;
+
+ private Logger LOG = Logger
+ .getLogger(XmlRpcResourceManager.class.getName());
+
+ private Server server;
+
+ /* our scheduler */
+ private Scheduler scheduler = null;
+
+ public AvroRpcResourceManager(int port) throws Exception{
+ // load properties from workflow manager properties file, if specified
+ if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) {
+ String configFile = System
+ .getProperty("org.apache.oodt.cas.resource.properties");
+ LOG.log(Level.INFO,
+ "Loading Resource Manager Configuration Properties from: ["
+ + configFile + "]");
+ System.getProperties().load(
+ new FileInputStream(new File(configFile)));
+ }
+
+ String schedulerClassStr = System.getProperty(
+ "resource.scheduler.factory",
+ "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory");
+
+ scheduler = GenericResourceManagerObjectFactory
+ .getSchedulerServiceFromFactory(schedulerClassStr);
+
+ // start up the scheduler
+ new Thread(scheduler).start();
+
+ this.port = port;
+
+ // start up the web server
+ server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class,this),
+ new InetSocketAddress(this.port));
+ server.start();
+
+ LOG.log(Level.INFO, "Resource Manager started by "
+ + System.getProperty("user.name", "unknown"));
+
+ }
+
+ @Override
+ public boolean isAlive() throws AvroRemoteException {
+ return true;
+ }
+
+ @Override
+ public int getJobQueueSize() throws AvroRemoteException {
+ try {
+ return this.scheduler.getJobQueue().getSize();
+ }catch (Exception e) {
+ throw new AvroRemoteException(new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e));
+ }
+ }
+
+
+ @Override
+ public int getJobQueueCapacity() throws AvroRemoteException {
+ try {
+ return this.scheduler.getJobQueue().getCapacity();
+ }catch (Exception e) {
+ throw new AvroRemoteException(new JobRepositoryException("Failed to get capacity of JobQueue : " + e.getMessage(), e));
+ }
+ }
+
+ @Override
+ public boolean isJobComplete(String jobId) throws AvroRemoteException {
+ try {
+ JobSpec spec = scheduler.getJobQueue().getJobRepository().getJobById(
+ jobId);
+ return scheduler.getJobQueue().getJobRepository().jobFinished(spec);
+
+ } catch(JobRepositoryException e ){
+ throw new AvroRemoteException(e);
+ }
+ }
+ @Override
+ public AvroJob getJobInfo(String jobId) throws AvroRemoteException {
+ JobSpec spec = null;
+
+ try {
+ spec = scheduler.getJobQueue().getJobRepository()
+ .getJobById(jobId);
+ } catch (JobRepositoryException e) {
+ LOG.log(Level.WARNING,
+ "Exception communicating with job repository for job: ["
+ + jobId + "]: Message: " + e.getMessage());
+ throw new AvroRemoteException(new JobRepositoryException("Unable to get job: [" + jobId
+ + "] from repository!"));
+ }
+
+ return AvroTypeFactory.getAvroJob(spec.getJob());
+
+ }
+
+ @Override
+ public String handleJob(AvroJob exec, AvroJobInput into) throws AvroRemoteException {
+ try {
+ return genericHandleJob(exec, into);
+ } catch (SchedulerException e) {
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ @Override
+ public boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, String hostUrl) throws AvroRemoteException {
+ try {
+ return genericHandleJob(exec,in,hostUrl);
+ } catch (JobExecutionException e) {
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ @Override
+ public List<AvroResourceNode> getNodes() throws AvroRemoteException {
+
+ List resNodes = null;
+ try {
+ resNodes = scheduler.getMonitor().getNodes();
+ } catch (MonitorException e) {
+ throw new AvroRemoteException(e);
+ }
+
+ return AvroTypeFactory.getListAvroResourceNode(resNodes);
+ }
+
+ @Override
+ public AvroResourceNode getNodeById(String nodeId) throws AvroRemoteException {
+ ResourceNode node = null;
+ try {
+ node = scheduler.getMonitor().getNodeById(nodeId);
+ } catch (MonitorException e) {
+ throw new AvroRemoteException(e);
+ }
+ return AvroTypeFactory.getAvroResourceNode(node);
+ }
+
+ @Override
+ public boolean killJob(String jobId) throws AvroRemoteException {
+ String resNodeId = scheduler.getBatchmgr().getExecutionNode(jobId);
+ if (resNodeId == null) {
+ LOG.log(Level.WARNING, "Attempt to kill job: [" + jobId
+ + "]: cannot find execution node"
+ + " (has the job already finished?)");
+ return false;
+ }
+ ResourceNode node = null;
+ try {
+ node = scheduler.getMonitor().getNodeById(resNodeId);
+ } catch (MonitorException e) {
+ throw new AvroRemoteException(e);
+ }
+ return scheduler.getBatchmgr().killJob(jobId, node);
+
+ }
+
+ @Override
+ public String getExecutionNode(String jobId) throws AvroRemoteException {
+ String execNode = scheduler.getBatchmgr().getExecutionNode(jobId);
+ if (execNode == null) {
+ LOG.log(Level.WARNING, "Job: [" + jobId
+ + "] not currently executing on any known node");
+ return "";
+ } else
+ return execNode;
+ }
+
+ @Override
+ public List<String> getQueues() throws AvroRemoteException {
+ try {
+ return this.scheduler.getQueueManager().getQueues();
+ } catch (QueueManagerException e) {
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ @Override
+ public boolean addQueue(String queueName) throws AvroRemoteException {
+ try {
+ this.scheduler.getQueueManager().addQueue(queueName);
+ } catch (QueueManagerException e) {
+ e.printStackTrace();
+ }
+ return true;
+
+ }
+
+ @Override
+ public boolean removeQueue(String queueName) throws AvroRemoteException {
+ try {
+ this.scheduler.getQueueManager().removeQueue(queueName);
+ } catch (QueueManagerException e) {
+ throw new AvroRemoteException(e);
+ }
+ return true;
+
+ }
+
+ @Override
+ public boolean addNode(AvroResourceNode node) throws AvroRemoteException {
+ try {
+ this.scheduler.getMonitor().addNode(AvroTypeFactory.getResourceNode(node));
+ } catch (MonitorException e) {
+ throw new AvroRemoteException(e);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean removeNode(String nodeId) throws AvroRemoteException {
+ try{
+ for(String queueName: this.getQueuesWithNode(nodeId)){
+ this.removeNodeFromQueue(nodeId, queueName);
+ }
+ this.scheduler.getMonitor().removeNodeById(nodeId);
+ }catch(Exception e){
+ throw new AvroRemoteException(new MonitorException(e.getMessage(), e));
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean addNodeToQueue(String nodeId, String queueName) throws AvroRemoteException {
+ try {
+ this.scheduler.getQueueManager().addNodeToQueue(nodeId, queueName);
+ } catch (QueueManagerException e) {
+ throw new AvroRemoteException(e);
+ }
+ return true;
+
+ }
+
+ @Override
+ public boolean removeNodeFromQueue(String nodeId, String queueName) throws AvroRemoteException {
+ try {
+ this.scheduler.getQueueManager().removeNodeFromQueue(nodeId, queueName);
+ } catch (QueueManagerException e) {
+ throw new AvroRemoteException(e);
+ }
+ return true;
+
+ }
+
+ @Override
+ public List<String> getNodesInQueue(String queueName) throws AvroRemoteException {
+ try {
+ return this.scheduler.getQueueManager().getNodes(queueName);
+ } catch (QueueManagerException e) {
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ @Override
+ public List<String> getQueuesWithNode(String nodeId) throws AvroRemoteException {
+ try {
+ return this.scheduler.getQueueManager().getQueues(nodeId);
+ } catch (QueueManagerException e) {
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ public boolean shutdown(){
+ if (this.server != null) {
+ this.server.close();
+ this.server = null;
+ return true;
+ } else
+ return false;
+ }
+
+ @Override
+ public String getNodeLoad(String nodeId) throws AvroRemoteException {
+ ResourceNode node = null;
+ try {
+ node = this.scheduler.getMonitor().getNodeById(nodeId);
+ int capacity = node.getCapacity();
+ int load = (this.scheduler.getMonitor().getLoad(node)) * -1 + capacity;
+ return load + "/" + capacity;
+ } catch (MonitorException e) {
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int portNum = -1;
+ String usage = "AvroRpcResourceManager --portNum <port number for xml rpc service>\n";
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("--portNum")) {
+ portNum = Integer.parseInt(args[++i]);
+ }
+ }
+
+ if (portNum == -1) {
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+ AvroRpcResourceManager manager = new AvroRpcResourceManager(portNum);
+
+ for (;;)
+ try {
+ Thread.currentThread().join();
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+
+ @Override
+ public boolean setNodeCapacity(String nodeId, int capacity) throws AvroRemoteException {
+ try{
+ this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity);
+ }catch (MonitorException e){
+ LOG.log(Level.WARNING, "Exception setting capacity on node "
+ + nodeId + ": " + e.getMessage());
+ return false;
+ }
+ return true;
+
+ }
+
+
+ private String genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput)
+ throws SchedulerException {
+
+ Job exec = AvroTypeFactory.getJob(avroJob);
+ JobInput in = AvroTypeFactory.getJobInput(avroJobInput);
+ JobSpec spec = new JobSpec(in, exec);
+
+ // queue the job up
+ String jobId = null;
+
+ try {
+ jobId = scheduler.getJobQueue().addJob(spec);
+ } catch (JobQueueException e) {
+ LOG.log(Level.WARNING, "JobQueue exception adding job: Message: "
+ + e.getMessage());
+ throw new SchedulerException(e.getMessage());
+ }
+ return jobId;
+ }
+
+ private boolean genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput,
+ String urlStr) throws JobExecutionException {
+ Job exec = AvroTypeFactory.getJob(avroJob);
+ JobInput in = AvroTypeFactory.getJobInput(avroJobInput);
+
+ JobSpec spec = new JobSpec(in, exec);
+
+ URL remoteUrl = safeGetUrlFromString(urlStr);
+ ResourceNode remoteNode = null;
+
+ try {
+ remoteNode = scheduler.getMonitor().getNodeByURL(remoteUrl);
+ } catch (MonitorException e) {
+ }
+
+ if (remoteNode != null) {
+ return scheduler.getBatchmgr().executeRemotely(spec, remoteNode);
+ } else
+ return false;
+ }
+
+ private URL safeGetUrlFromString(String urlStr) {
+ URL url = null;
+
+ try {
+ url = new URL(urlStr);
+ } catch (MalformedURLException e) {
+ LOG.log(Level.WARNING, "Error converting string: [" + urlStr
+ + "] to URL object: Message: " + e.getMessage());
+ }
+
+ return url;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
new file mode 100644
index 0000000..fa0e84b
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.oodt.cas.cli.CmdLineUtility;
+import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcResourceManagerClient implements ResourceManagerClient {
+
+ /* our log stream */
+ private static Logger LOG = Logger
+ .getLogger(XmlRpcResourceManagerClient.class.getName());
+
+ /* resource manager url */
+ private URL resMgrUrl = null;
+
+ Transceiver client;
+ ResourceManager proxy;
+
+ public AvroRpcResourceManagerClient(URL url) {
+ // set up the configuration, if there is any
+ if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) {
+ String configFile = System
+ .getProperty("org.apache.oodt.cas.resource.properties");
+ LOG.log(Level.INFO,
+ "Loading Resource Manager Configuration Properties from: ["
+ + configFile + "]");
+ try {
+ System.getProperties().load(
+ new FileInputStream(new File(configFile)));
+ } catch (Exception e) {
+ LOG.log(Level.INFO,
+ "Error loading configuration properties from: ["
+ + configFile + "]");
+ }
+ }
+
+ try {
+ this.client = new NettyTransceiver(new InetSocketAddress(url.getPort()));
+ proxy = (ResourceManager) SpecificRequestor.getClient(ResourceManager.class, client);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static void main(String[] args) {
+ CmdLineUtility cmdLineUtility = new CmdLineUtility();
+ cmdLineUtility.run(args);
+ }
+
+
+ @Override
+ public boolean isJobComplete(String jobId) throws JobRepositoryException {
+ try {
+ return proxy.isJobComplete(jobId);
+ } catch (AvroRemoteException e) {
+ throw new JobRepositoryException(e);
+ }
+ }
+
+ @Override
+ public Job getJobInfo(String jobId) throws JobRepositoryException {
+ try {
+ return AvroTypeFactory.getJob(proxy.getJobInfo(jobId));
+ } catch (AvroRemoteException e) {
+ throw new JobRepositoryException(e);
+ }
+ }
+
+ @Override
+ public boolean isAlive() {
+ try {
+ return proxy.isAlive();
+ } catch (AvroRemoteException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ @Override
+ public int getJobQueueSize() throws JobRepositoryException {
+ try {
+ return proxy.getJobQueueSize();
+ } catch (AvroRemoteException e) {
+ throw new JobRepositoryException(e);
+ }
+ }
+
+ @Override
+ public int getJobQueueCapacity() throws JobRepositoryException {
+ try {
+ return proxy.getJobQueueCapacity();
+ } catch (AvroRemoteException e) {
+ throw new JobRepositoryException(e);
+ }
+ }
+
+ @Override
+ public boolean killJob(String jobId) {
+ try {
+ return proxy.killJob(jobId);
+ } catch (AvroRemoteException e) {
+ LOG.log(Level.SEVERE,
+ "Server error!");
+ }
+ return false;
+ }
+
+ @Override
+ public String getExecutionNode(String jobId) {
+ try {
+ return proxy.getExecutionNode(jobId);
+ } catch (AvroRemoteException e) {
+ LOG.log(Level.SEVERE,
+ "Server error!");
+ }
+ return null;
+ }
+
+ @Override
+ public String submitJob(Job exec, JobInput in) throws JobExecutionException {
+ try {
+ return proxy.handleJob(AvroTypeFactory.getAvroJob(exec),AvroTypeFactory.getAvroJobInput(in));
+ } catch (AvroRemoteException e) {
+ LOG.log(Level.SEVERE,
+ "Server error!");
+
+ }
+ return null;
+ }
+
+ @Override
+ public boolean submitJob(Job exec, JobInput in, URL hostUrl) throws JobExecutionException {
+ try {
+ return proxy.handleJobWithUrl(AvroTypeFactory.getAvroJob(exec), AvroTypeFactory.getAvroJobInput(in), hostUrl.toString());
+ } catch (AvroRemoteException e) {
+ throw new JobExecutionException(e);
+ }
+ }
+
+ @Override
+ public List getNodes() throws MonitorException {
+ try {
+ return AvroTypeFactory.getListResourceNode(proxy.getNodes());
+ } catch (AvroRemoteException e) {
+ throw new MonitorException(e);
+ }
+ }
+
+ @Override
+ public ResourceNode getNodeById(String nodeId) throws MonitorException {
+ try {
+ return AvroTypeFactory.getResourceNode(proxy.getNodeById(nodeId));
+ } catch (AvroRemoteException e) {
+ throw new MonitorException(e);
+ }
+ }
+
+ @Override
+ public URL getResMgrUrl() {
+ return this.resMgrUrl;
+ }
+
+ @Override
+ public void setResMgrUrl(URL resMgrUrl) {
+ this.resMgrUrl = resMgrUrl;
+ }
+
+ @Override
+ public void addQueue(String queueName) throws QueueManagerException {
+ try {
+ proxy.addQueue(queueName);
+ } catch (AvroRemoteException e) {
+ throw new QueueManagerException(e);
+ }
+ }
+
+ @Override
+ public void removeQueue(String queueName) throws QueueManagerException {
+ try {
+ proxy.removeQueue(queueName);
+ } catch (AvroRemoteException e) {
+ throw new QueueManagerException(e);
+ }
+
+ }
+
+ @Override
+ public void addNode(ResourceNode node) throws MonitorException {
+ try {
+ proxy.addNode(AvroTypeFactory.getAvroResourceNode(node));
+ } catch (AvroRemoteException e) {
+ throw new MonitorException(e);
+ }
+ }
+
+ @Override
+ public void removeNode(String nodeId) throws MonitorException {
+ try {
+ proxy.removeNode(nodeId);
+ } catch (AvroRemoteException e) {
+ throw new MonitorException(e);
+ }
+ }
+
+ @Override
+ public void setNodeCapacity(String nodeId, int capacity) throws MonitorException {
+ try {
+ proxy.setNodeCapacity(nodeId,capacity);
+ } catch (AvroRemoteException e) {
+ throw new MonitorException(e);
+ }
+ }
+
+ @Override
+ public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
+ try {
+ proxy.addNodeToQueue(nodeId,queueName);
+ } catch (AvroRemoteException e) {
+ throw new QueueManagerException(e);
+ }
+ }
+
+ @Override
+ public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
+ try {
+ proxy.removeNodeFromQueue(nodeId,queueName);
+ } catch (AvroRemoteException e) {
+ throw new QueueManagerException(e);
+ }
+ }
+
+ @Override
+ public List<String> getQueues() throws QueueManagerException {
+ try {
+ return proxy.getQueues();
+ } catch (AvroRemoteException e) {
+ throw new QueueManagerException(e);
+ }
+ }
+
+ @Override
+ public List<String> getNodesInQueue(String queueName) throws QueueManagerException {
+ try {
+ return proxy.getNodesInQueue(queueName);
+ } catch (AvroRemoteException e) {
+ throw new QueueManagerException(e);
+ }
+ }
+
+ @Override
+ public List<String> getQueuesWithNode(String nodeId) throws QueueManagerException {
+ try {
+ return proxy.getQueuesWithNode(nodeId);
+ } catch (AvroRemoteException e) {
+ throw new QueueManagerException(e);
+ }
+ }
+
+ @Override
+ public String getNodeLoad(String nodeId) throws MonitorException {
+ try {
+ return proxy.getNodeLoad(nodeId);
+ } catch (AvroRemoteException e) {
+ throw new MonitorException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
new file mode 100644
index 0000000..5cbf6d3
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.oodt.cas.resource.structs.exceptions.*;
+
+import java.util.Date;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Vector;
+
+public interface ResourceManager {
+
+ boolean shutdown();
+
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
new file mode 100644
index 0000000..dd4444b
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system;
+
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+import java.net.URL;
+import java.util.List;
+
+public interface ResourceManagerClient {
+ boolean isJobComplete(String jobId) throws JobRepositoryException;
+
+ Job getJobInfo(String jobId) throws JobRepositoryException;
+
+ boolean isAlive();
+
+ int getJobQueueSize() throws JobRepositoryException;
+
+ int getJobQueueCapacity() throws JobRepositoryException;
+
+ boolean killJob(String jobId);
+
+ String getExecutionNode(String jobId);
+
+ String submitJob(Job exec, JobInput in) throws JobExecutionException;
+
+ boolean submitJob(Job exec, JobInput in, URL hostUrl)
+ throws JobExecutionException;
+
+ List getNodes() throws MonitorException;
+
+ ResourceNode getNodeById(String nodeId) throws MonitorException;
+
+ URL getResMgrUrl();
+
+ void setResMgrUrl(URL resMgrUrl);
+
+ void addQueue(String queueName) throws QueueManagerException;
+
+ void removeQueue(String queueName) throws QueueManagerException;
+
+ void addNode(ResourceNode node) throws MonitorException;
+
+ void removeNode(String nodeId) throws MonitorException;
+
+ void setNodeCapacity(String nodeId, int capacity) throws MonitorException;
+
+ void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException;
+
+ void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException;
+
+ List<String> getQueues() throws QueueManagerException;
+
+ List<String> getNodesInQueue(String queueName) throws QueueManagerException;
+
+ List<String> getQueuesWithNode(String nodeId) throws QueueManagerException;
+
+ String getNodeLoad(String nodeId) throws MonitorException;
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
index 0fb0520..9110807 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java
@@ -56,7 +56,7 @@ import java.io.IOException;
* </p>
*
*/
-public class XmlRpcResourceManagerClient {
+public class XmlRpcResourceManagerClient implements ResourceManagerClient {
/* our xml rpc client */
private XmlRpcClient client = null;
@@ -119,6 +119,7 @@ public class XmlRpcResourceManagerClient {
cmdLineUtility.run(args);
}
+ @Override
public boolean isJobComplete(String jobId) throws JobRepositoryException {
Vector argList = new Vector();
argList.add(jobId);
@@ -137,6 +138,7 @@ public class XmlRpcResourceManagerClient {
return complete;
}
+ @Override
public Job getJobInfo(String jobId) throws JobRepositoryException {
Vector argList = new Vector();
argList.add(jobId);
@@ -155,6 +157,7 @@ public class XmlRpcResourceManagerClient {
return XmlRpcStructFactory.getJobFromXmlRpc(jobHash);
}
+ @Override
public boolean isAlive() {
Vector argList = new Vector();
@@ -174,6 +177,7 @@ public class XmlRpcResourceManagerClient {
* @return Number of Jobs in JobQueue
* @throws JobRepositoryException On Any Exception
*/
+ @Override
public int getJobQueueSize() throws JobRepositoryException {
try {
Vector argList = new Vector();
@@ -188,6 +192,7 @@ public class XmlRpcResourceManagerClient {
* @return Max number of Jobs
* @throws JobRepositoryException On Any Exception
*/
+ @Override
public int getJobQueueCapacity() throws JobRepositoryException {
try {
Vector argList = new Vector();
@@ -197,6 +202,7 @@ public class XmlRpcResourceManagerClient {
}
}
+ @Override
public boolean killJob(String jobId) {
Vector argList = new Vector();
argList.add(jobId);
@@ -211,6 +217,7 @@ public class XmlRpcResourceManagerClient {
}
}
+ @Override
public String getExecutionNode(String jobId) {
Vector argList = new Vector();
argList.add(jobId);
@@ -224,6 +231,7 @@ public class XmlRpcResourceManagerClient {
}
}
+ @Override
public String submitJob(Job exec, JobInput in) throws JobExecutionException {
Vector argList = new Vector();
argList.add(XmlRpcStructFactory.getXmlRpcJob(exec));
@@ -245,6 +253,7 @@ public class XmlRpcResourceManagerClient {
}
+ @Override
public boolean submitJob(Job exec, JobInput in, URL hostUrl)
throws JobExecutionException {
Vector argList = new Vector();
@@ -267,6 +276,7 @@ public class XmlRpcResourceManagerClient {
}
+ @Override
public List getNodes() throws MonitorException {
Vector argList = new Vector();
@@ -285,6 +295,7 @@ public class XmlRpcResourceManagerClient {
}
+ @Override
public ResourceNode getNodeById(String nodeId) throws MonitorException {
Vector argList = new Vector();
argList.add(nodeId);
@@ -307,6 +318,7 @@ public class XmlRpcResourceManagerClient {
/**
* @return the resMgrUrl
*/
+ @Override
public URL getResMgrUrl() {
return resMgrUrl;
}
@@ -315,6 +327,7 @@ public class XmlRpcResourceManagerClient {
* @param resMgrUrl
* the resMgrUrl to set
*/
+ @Override
public void setResMgrUrl(URL resMgrUrl) {
this.resMgrUrl = resMgrUrl;
}
@@ -324,6 +337,7 @@ public class XmlRpcResourceManagerClient {
* @param queueName The name of the queue to be created
* @throws QueueManagerException on any error
*/
+ @Override
public void addQueue(String queueName) throws QueueManagerException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -339,6 +353,7 @@ public class XmlRpcResourceManagerClient {
* @param queueName The name of the queue to be removed
* @throws QueueManagerException on any error
*/
+ @Override
public void removeQueue(String queueName) throws QueueManagerException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -354,6 +369,7 @@ public class XmlRpcResourceManagerClient {
* @param node The node to be added
* @throws MonitorException on any error
*/
+ @Override
public void addNode(ResourceNode node) throws MonitorException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -369,6 +385,7 @@ public class XmlRpcResourceManagerClient {
* @param nodeId The id of the node to be removed
* @throws MonitorException on any error
*/
+ @Override
public void removeNode(String nodeId) throws MonitorException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -379,6 +396,7 @@ public class XmlRpcResourceManagerClient {
}
}
+ @Override
public void setNodeCapacity(String nodeId, int capacity) throws MonitorException{
try{
Vector<Object> argList = new Vector<Object>();
@@ -396,6 +414,7 @@ public class XmlRpcResourceManagerClient {
* @param queueName The name of the queue to add the given node
* @throws QueueManagerException on any error
*/
+ @Override
public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -413,6 +432,7 @@ public class XmlRpcResourceManagerClient {
* @param queueName The name of the queue from which to remove the given node
* @throws QueueManagerException on any error
*/
+ @Override
public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -429,6 +449,7 @@ public class XmlRpcResourceManagerClient {
* @return A list of currently supported queue names
* @throws QueueManagerException on any error
*/
+ @Override
public List<String> getQueues() throws QueueManagerException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -444,6 +465,7 @@ public class XmlRpcResourceManagerClient {
* @return List of node ids in the given queueName
* @throws QueueManagerException on any error
*/
+ @Override
public List<String> getNodesInQueue(String queueName) throws QueueManagerException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -460,6 +482,7 @@ public class XmlRpcResourceManagerClient {
* @return List of queues which contain the give node
* @throws QueueManagerException on any error
*/
+ @Override
public List<String> getQueuesWithNode(String nodeId) throws QueueManagerException {
try {
Vector<Object> argList = new Vector<Object>();
@@ -476,6 +499,7 @@ public class XmlRpcResourceManagerClient {
* @return A String showing a fraction of the loads node over its capacity
* @throws MonitorException on any error
*/
+ @Override
public String getNodeLoad(String nodeId) throws MonitorException{
try{
Vector<Object> argList = new Vector<Object>();
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java
new file mode 100644
index 0000000..2d55d19
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.system.extern;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.oodt.cas.resource.structs.AvroTypeFactory;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroIntrBatchmgr;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroJob;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroJobInput;
+import org.apache.oodt.cas.resource.structs.avrotypes.AvroResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
+import org.apache.xmlrpc.WebServer;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AvroRpcBatchStub implements AvroIntrBatchmgr {
+
+ /* the port to run the XML RPC web server on, default is 2000 */
+ private int port = 2000;
+
+ /* our avro rpc web server */
+ Server server;
+
+ /* our log stream */
+ private static Logger LOG = Logger.getLogger(AvroRpcBatchStub.class
+ .getName());
+
+ private static Map jobThreadMap = null;
+
+ public AvroRpcBatchStub(int port) throws Exception {
+
+
+ this.port = port;
+
+ // start up the web server
+ server = new NettyServer(new SpecificResponder(AvroIntrBatchmgr.class,this), new InetSocketAddress(this.port));
+ server.start();
+
+ jobThreadMap = new HashMap();
+
+ LOG.log(Level.INFO, "AvroRpc Batch Stub started by "
+ + System.getProperty("user.name", "unknown"));
+ }
+
+ @Override
+ public boolean isAlive() throws AvroRemoteException {
+ return true;
+ }
+
+ @Override
+ public boolean executeJob(AvroJob avroJob, AvroJobInput jobInput) throws AvroRemoteException {
+ try {
+ return genericExecuteJob(avroJob,jobInput);
+ } catch (JobException e) {
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ @Override
+ public boolean killJob(AvroJob jobHash) throws AvroRemoteException {
+ Job job = AvroTypeFactory.getJob(jobHash);
+ Thread jobThread = (Thread) jobThreadMap.get(job.getId());
+ if (jobThread == null) {
+ LOG.log(Level.WARNING, "Job: [" + job.getId()
+ + "] not managed by this batch stub");
+ return false;
+ }
+
+ // okay, so interrupt it, which should cause it to stop
+ jobThread.interrupt();
+ return true;
+ }
+
+ private boolean genericExecuteJob(AvroJob avroJob, AvroJobInput jobInput)
+ throws JobException {
+ JobInstance exec = null;
+ JobInput in = null;
+ try {
+ Job job = AvroTypeFactory.getJob(avroJob);
+
+ LOG.log(Level.INFO, "stub attempting to execute class: ["
+ + job.getJobInstanceClassName() + "]");
+
+ exec = GenericResourceManagerObjectFactory
+ .getJobInstanceFromClassName(job.getJobInstanceClassName());
+ in = AvroTypeFactory.getJobInput(jobInput);
+ // load the input obj
+ //
+
+ // create threaded job
+ // so that it can be interrupted
+ RunnableJob runner = new RunnableJob(exec, in);
+ Thread threadRunner = new Thread(runner);
+ /* save this job thread in a map so we can kill it later */
+ jobThreadMap.put(job.getId(), threadRunner);
+ threadRunner.start();
+
+ try {
+ threadRunner.join();
+ } catch (InterruptedException e) {
+ LOG.log(Level.INFO, "Current job: [" + job.getName()
+ + "]: killed: exiting gracefully");
+ synchronized (jobThreadMap) {
+ Thread endThread = (Thread) jobThreadMap.get(job.getId());
+ if (endThread != null)
+ endThread = null;
+ }
+ return false;
+ }
+
+ synchronized (jobThreadMap) {
+ Thread endThread = (Thread) jobThreadMap.get(job.getId());
+ if (endThread != null)
+ endThread = null;
+ }
+
+ return runner.wasSuccessful();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ private class RunnableJob implements Runnable {
+
+ private JobInput in;
+
+ private JobInstance job;
+
+ private boolean successful;
+
+ public RunnableJob(JobInstance job, JobInput in) {
+ this.job = job;
+ this.in = in;
+ this.successful = false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ try {
+ this.successful = job.execute(in);
+ } catch (JobInputException e) {
+ e.printStackTrace();
+ this.successful = false;
+ }
+
+ }
+
+ public boolean wasSuccessful() {
+ return this.successful;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int portNum = -1;
+ String usage = "AvroRpcBatchStub --portNum <port number for xml rpc service>\n";
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("--portNum")) {
+ portNum = Integer.parseInt(args[++i]);
+ }
+ }
+
+ if (portNum == -1) {
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+ XmlRpcBatchStub stub = new XmlRpcBatchStub(portNum);
+
+ for (;;)
+ try {
+ Thread.currentThread().join();
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java b/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java
new file mode 100644
index 0000000..0fa28ef
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+import junit.framework.TestCase;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.system.extern.AvroRpcBatchStub;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class TestBatchMgr extends TestCase {
+
+ public void testAvroBatchMgr(){
+ AvroRpcBatchMgrFactory avroRpcBatchMgrFactory = new AvroRpcBatchMgrFactory();
+ Batchmgr batchmgr = avroRpcBatchMgrFactory.createBatchmgr();
+ assertNotNull(batchmgr);
+
+ try {
+ AvroRpcBatchStub avroRpcBatchStub = new AvroRpcBatchStub(50001);
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ ResourceNode resNode = new ResourceNode();
+ try {
+ resNode.setIpAddr(new URL("http//:localhost:50001"));
+ } catch (MalformedURLException e) {
+ fail(e.getMessage());
+ }
+
+ AvroRpcBatchMgrProxy bmc = new AvroRpcBatchMgrProxy(new JobSpec(), new ResourceNode(),(AvroRpcBatchMgr)batchmgr);
+
+ assertTrue(bmc.nodeAlive());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java b/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java
new file mode 100644
index 0000000..1ad4e18
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.structs;
+
+import junit.framework.TestCase;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Properties;
+
+public class TestAvroTypeFactory extends TestCase {
+
+ public void testAvroJobInput(){
+ JobInput jobInput = GenericResourceManagerObjectFactory
+ .getJobInputFromClassName("org.apache.oodt.cas.resource.structs.NameValueJobInput");
+
+ assertNotNull(jobInput);
+ Properties properties = new Properties();
+ properties.setProperty("key","prop1");
+ jobInput.configure(properties);
+
+ JobInput afterJobInput = AvroTypeFactory.getJobInput(AvroTypeFactory.getAvroJobInput(jobInput));
+
+ assertNotNull(afterJobInput);
+ assertEquals(afterJobInput.getId(),jobInput.getId());
+
+
+ }
+
+ public void testAvroJob(){
+ Job initJob = new Job();
+
+ initJob.setId("id");
+ initJob.setJobInputClassName("classname");
+ initJob.setJobInstanceClassName("instClassName");
+ initJob.setLoadValue(42);
+ initJob.setQueueName("queueName");
+ initJob.setStatus("status");
+ initJob.setName("name");
+
+ Job afterJob = AvroTypeFactory.getJob(AvroTypeFactory.getAvroJob(initJob));
+
+
+
+ assertEquals("id",afterJob.getId());
+
+ assertEquals("classname",afterJob.getJobInputClassName());
+
+ assertEquals("instClassName",afterJob.getJobInstanceClassName());
+
+ assertEquals(new Integer(42),afterJob.getLoadValue());
+
+ assertEquals("name",afterJob.getName());
+
+ assertEquals("queueName",afterJob.getQueueName());
+
+ assertEquals("status",afterJob.getStatus());
+ }
+
+ public void testNameValueJobInput(){
+ NameValueJobInput initNameValueJobInput = new NameValueJobInput();
+
+ initNameValueJobInput.setNameValuePair("name","value");
+
+ NameValueJobInput afterNameValueJobInput =(NameValueJobInput) AvroTypeFactory.getJobInput(
+ AvroTypeFactory.getAvroJobInput(
+ initNameValueJobInput));
+
+ assertEquals(initNameValueJobInput.getId(),afterNameValueJobInput.getId());
+ assertEquals("value", afterNameValueJobInput.getProps().getProperty("name"));
+
+ }
+
+ public void testAvroResourceNode(){
+ ResourceNode initResourceNode = new ResourceNode();
+
+ initResourceNode.setCapacity(42);
+
+ initResourceNode.setId("id");
+
+ try {
+ initResourceNode.setIpAddr(new URL("http//:localhost"));
+ } catch (MalformedURLException e) {
+ fail(e.getMessage());
+ }
+
+ ResourceNode afterResourceNode = AvroTypeFactory.getResourceNode(AvroTypeFactory.getAvroResourceNode(initResourceNode));
+
+ assertEquals(initResourceNode.getCapacity(),afterResourceNode.getCapacity());
+
+ assertEquals(initResourceNode.getIpAddr(),afterResourceNode.getIpAddr());
+
+ assertEquals(initResourceNode.getNodeId(),afterResourceNode.getNodeId());
+
+ }
+}