You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by st...@apache.org on 2014/10/24 23:38:01 UTC
svn commit: r1634142 - in /oodt/trunk/resource/src:
main/java/org/apache/oodt/cas/resource/mux/
main/java/org/apache/oodt/cas/resource/structs/exceptions/
main/java/org/apache/oodt/cas/resource/util/ main/resources/
main/resources/examples/ test/org/ap...
Author: starchmd
Date: Fri Oct 24 21:38:01 2014
New Revision: 1634142
URL: http://svn.apache.org/r1634142
Log:
Submitting multiplexing backend for resource manager
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java
oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java
Modified:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java
oodt/trunk/resource/src/main/resources/resource.properties
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.mux;
+
+import java.util.List;
+
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.scheduler.Scheduler;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+/**
+ * Interface for the backend manager
+ *
+ * @author starchmd
+ */
+public interface BackendManager {
+
+ /**
+ * Add in a backend set to this manager.
+ * @param queue - queue that maps to the given monitor, batchmgr, and scheduler
+ * @param monitor - monitor used for this set
+ * @param batchmgr - batch manager for this set
+ * @param scheduler - scheduler for this set
+ */
+ public void addSet(String queue,Monitor monitor, Batchmgr batchmgr, Scheduler scheduler);
+ /**
+ * Return monitor for the given queue.
+ * @param queue - queue to check
+ * @return montior
+ * @throws QueueManagerException when queue does not exist
+ */
+ public Monitor getMonitor(String queue) throws QueueManagerException;
+ /**
+ * Return batch manager for the given queue.
+ * @param queue - queue to check
+ * @return batchmgr
+ * @throws QueueManagerException when queue does not exist
+ */
+ public Batchmgr getBatchmgr(String queue) throws QueueManagerException;
+ /**
+ * Return scheduler for the given queue.
+ * @param queue - queue to check
+ * @return scheduler
+ * @throws QueueManagerException when queue does not exist
+ */
+ public Scheduler getScheduler(String queue) throws QueueManagerException;
+ /**
+ * Return a list of all monitors.
+ * @return list of all monitors
+ */
+ public List<Monitor> getMonitors();
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.mux;
+
+import org.apache.oodt.cas.resource.structs.exceptions.RepositoryException;
+
+/**
+ * Interface to handle loading of the configuration for which queues are associated
+ * with which backend. i.e. read BackendManager configuration.
+ *
+ * @author starchmd
+ */
+public interface BackendRepository {
+ /**
+ * Load the backend.
+ * @return BackendManager all set up and ready to go.
+ */
+ public BackendManager load() throws RepositoryException;
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.mux;
+
+/**
+ *
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * The Backend Manager Repository Factory interface.
+ * </p>
+ */
+public interface BackendRepositoryFactory {
+
+ /**
+ * Create a backend repository
+ * @return the newly minted backend repository
+ */
+ public BackendRepository createBackendRepository();
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.mux;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * A batch-manager used to execute and control jobs in a mesos-cluster.
+ */
+public class QueueMuxBatchManager implements Batchmgr {
+
+ private Logger LOG = Logger.getLogger(QueueMuxBatchManager.class.getName());
+
+ BackendManager backend;
+ Map<String,String> jobIdToQueue = new HashMap<String,String>();
+ JobRepository repo;
+
+ /**
+ * ctor
+ * @param bm - backend manager
+ */
+ public QueueMuxBatchManager(BackendManager bm) {
+ setBackendManager(bm);
+ }
+ /**
+ * Set the backend manager.
+ * @param backend - backend manager effectively mapping queue's to sets of backends.
+ */
+ public void setBackendManager(BackendManager backend) {
+ this.backend = backend;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.batchmgr.Batchmgr#executeRemotely(org.apache.oodt.cas.resource.structs.JobSpec, org.apache.oodt.cas.resource.structs.ResourceNode)
+ */
+ @Override
+ public boolean executeRemotely(JobSpec job, ResourceNode resNode)
+ throws JobExecutionException {
+ try {
+ jobIdToQueue.put(job.getJob().getId(),job.getJob().getQueueName());
+ return getManagerByQueue(job.getJob().getQueueName()).executeRemotely(job, resNode);
+ } catch (QueueManagerException e) {
+ jobIdToQueue.remove(job.getJob().getQueueName());
+ LOG.log(Level.WARNING, "Exception recieved while executing job: "+e.getLocalizedMessage()+". Job will not execute.");
+ throw new JobExecutionException(e);
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.batchmgr.Batchmgr#setMonitor(org.apache.oodt.cas.resource.monitor.Monitor)
+ */
+ @Override
+ public void setMonitor(Monitor monitor) {
+ throw new UnsupportedOperationException("Cannot set the monitor when using the queue-mux batch manager.");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.batchmgr.Batchmgr#setJobRepository(org.apache.oodt.cas.resource.jobrepo.JobRepository)
+ */
+ @Override
+ public void setJobRepository(JobRepository repository) {
+ this.repo = repository;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.batchmgr.Batchmgr#killJob(java.lang.String, org.apache.oodt.cas.resource.structs.ResourceNode)
+ */
+ @Override
+ public boolean killJob(String jobId, ResourceNode node) {
+ try {
+ return getManagerByJob(jobId).killJob(jobId,node);
+ } catch (QueueManagerException e) {
+ LOG.log(Level.SEVERE, "Cannot kill job: "+e.getLocalizedMessage());
+ }
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.batchmgr.Batchmgr#getExecutionNode(java.lang.String)
+ */
+ @Override
+ public String getExecutionNode(String jobId) {
+ try {
+ return getManagerByJob(jobId).getExecutionNode(jobId);
+ } catch (QueueManagerException e) {
+ LOG.log(Level.SEVERE, "Cannot get exectuion node for job: "+e.getLocalizedMessage());
+ }
+ return null;
+ }
+
+ private Batchmgr getManagerByJob(String jobId) throws QueueManagerException {
+ return getManagerByQueue(jobIdToQueue.get(jobId));
+ }
+
+ private Batchmgr getManagerByQueue(String queue) throws QueueManagerException {
+ return this.backend.getBatchmgr(queue);
+ }
+
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.mux;
+
+import java.net.URL;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.scheduler.QueueManager;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * A monitor to monitor the multiple monitors.
+ */
+public class QueueMuxMonitor implements Monitor {
+ private static final Logger LOG = Logger.getLogger(QueueMuxMonitor.class.getName());
+ private BackendManager backend;
+ private QueueManager qManager;
+ /**
+ * ctor
+ * @param backend - backend manager
+ * @param qManager - queue manager
+ */
+ public QueueMuxMonitor(BackendManager backend, QueueManager qManager) {
+ setBackendManager(backend,qManager);
+ }
+ /**
+ * Set the backend manager.
+ * @param backend - backend manager effectively mapping queue's to sets of backends.
+ */
+ public void setBackendManager(BackendManager backend, QueueManager qManager) {
+ this.backend = backend;
+ this.qManager = qManager;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#getLoad(org.apache.oodt.cas.resource.structs.ResourceNode)
+ */
+ @Override
+ public int getLoad(ResourceNode node) throws MonitorException {
+ //Unclear what to do here.
+ //Assuming we should never be more than "Max"
+ List<String> queues = queuesForNode(node);
+ int max = 0;
+ for (String queue : queues) {
+ try {
+ max = Math.max(max,backend.getMonitor(queue).getLoad(node));
+ } catch (QueueManagerException e) {
+ LOG.log(Level.WARNING,"Queue '"+queue+"' has dissappeared.");
+ }
+ }
+ return max;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#getNodes()
+ */
+ @Override
+ public List<ResourceNode> getNodes() throws MonitorException {
+ Set<ResourceNode> set = new LinkedHashSet<ResourceNode>();
+ for (Monitor mon:this.backend.getMonitors()) {
+ for (Object res:mon.getNodes()) {
+ set.add((ResourceNode)res);
+ }
+ }
+ return new LinkedList<ResourceNode>(set);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#getNodeById(java.lang.String)
+ */
+ @Override
+ public ResourceNode getNodeById(String nodeId) throws MonitorException {
+ ResourceNode node = null;
+ Iterator<Monitor> imon = this.backend.getMonitors().iterator();
+ while(imon.hasNext() && (node = imon.next().getNodeById(nodeId)) == null) {}
+ return node;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#getNodeByURL(java.net.URL)
+ */
+ @Override
+ public ResourceNode getNodeByURL(URL ipAddr) throws MonitorException {
+ ResourceNode node = null;
+ Iterator<Monitor> imon = this.backend.getMonitors().iterator();
+ while(imon.hasNext() && (node = imon.next().getNodeByURL(ipAddr)) == null) {}
+ return node;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#reduceLoad(org.apache.oodt.cas.resource.structs.ResourceNode, int)
+ */
+ @Override
+ public boolean reduceLoad(ResourceNode node, int loadValue)
+ throws MonitorException {
+ List<String> queues = queuesForNode(node);
+ boolean ret = true;
+ for (String queue:queues) {
+ try {
+ ret &= backend.getMonitor(queue).reduceLoad(node, loadValue);
+ } catch (QueueManagerException e) {
+ LOG.log(Level.SEVERE,"Queue '"+queue+"' has dissappeared.");
+ throw new MonitorException(e);
+ }
+ }
+ return ret;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#assignLoad(org.apache.oodt.cas.resource.structs.ResourceNode, int)
+ */
+ @Override
+ public boolean assignLoad(ResourceNode node, int loadValue)
+ throws MonitorException {
+ List<String> queues = queuesForNode(node);
+ boolean ret = true;
+ for (String queue:queues) {
+ try {
+ ret &= backend.getMonitor(queue).assignLoad(node, loadValue);
+ } catch (QueueManagerException e) {
+ LOG.log(Level.SEVERE,"Queue '"+queue+"' has dissappeared.");
+ throw new MonitorException(e);
+ }
+ }
+ return ret;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#addNode(org.apache.oodt.cas.resource.structs.ResourceNode)
+ */
+ @Override
+ public void addNode(ResourceNode node) throws MonitorException {
+ List<String> queues = queuesForNode(node);
+ for (String queue:queues) {
+ try {
+ backend.getMonitor(queue).addNode(node);
+ } catch (QueueManagerException e) {
+ LOG.log(Level.SEVERE,"Queue '"+queue+"' has dissappeared.");
+ throw new MonitorException(e);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#removeNodeById(java.lang.String)
+ */
+ @Override
+ public void removeNodeById(String nodeId) throws MonitorException {
+ for (Monitor mon:this.backend.getMonitors()) {
+ mon.removeNodeById(nodeId);
+ }
+ }
+ /**
+ * Gets the queues that are associated with a particular node.
+ * @param node - node which queues are needed for
+ * @return list of queue names on that node
+ */
+ private List<String> queuesForNode(ResourceNode node) {
+ List<String> ret = new LinkedList<String>();
+ //Get list of queues
+ List<String> queues = null;
+ try
+ {
+ queues = qManager.getQueues();
+ } catch (QueueManagerException e) {
+ LOG.log(Level.SEVERE, "Cannot list queues.");
+ }
+ //Search each queu to see if it contains given node
+ for (String queue : queues) {
+ try
+ {
+ if (qManager.getNodes(queue).contains(node.getNodeId())) {
+ ret.add(queue);
+ }
+ } catch(QueueManagerException e) {
+ LOG.log(Level.SEVERE, "Queue '"+queue+"' has dissappeared.");
+ }
+ }
+ return ret;
+ }
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.resource.mux;
+
+//JDKimports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+
+
+
+
+//OODT imports
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.scheduler.QueueManager;
+import org.apache.oodt.cas.resource.scheduler.Scheduler;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
+
+/**
+ * This scheduler multiplexes between multiple schedulers based on the "queue" .
+ *
+ * @author starchmd
+ * @version $Revision$
+ */
+public class QueueMuxScheduler implements Scheduler {
+
+ private static final Logger LOG = Logger.getLogger(QueueMuxScheduler.class.getName());
+
+ private BackendManager backend;
+ private JobQueue queue;
+ private float waitTime = -1;
+
+ //Manages other queue-muxing components
+ private QueueMuxBatchManager batch;
+ private QueueMuxMonitor mon;
+ private QueueManager qManager;
+
+ /**
+ * ctor
+ * @param backend - Backend manager to handle the many different backends.
+ */
+ public QueueMuxScheduler(BackendManager backend, QueueManager qm, JobQueue jq) {
+ String waitStr = System.getProperty("org.apache.oodt.cas.resource.scheduler.wait.seconds", "20");
+ waitTime = Float.parseFloat(waitStr);
+ this.queue = jq;
+ this.qManager = qm;
+ this.backend = backend;
+ //Required, so make them here
+ batch = new QueueMuxBatchManager(backend);
+ mon = new QueueMuxMonitor(backend,qm);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ //Loop forever
+ while (true) {
+ try {
+ Thread.sleep((long) (waitTime * 1000.0));
+ } catch (InterruptedException e) {
+ //If the thread will continue, reinterrupt thread
+ Thread.currentThread().interrupt();
+ }
+ //You have jobs
+ if (!queue.isEmpty()) {
+ JobSpec job = null;
+ try {
+ job = queue.getNextJob();
+ LOG.log(Level.INFO, "Scheduling job: ["+ job.getJob().getId()+ "] for execution");
+ schedule(job);
+ } catch (JobQueueException je) {
+ LOG.log(Level.WARNING,"Error getting job from queue: "
+ + je.getLocalizedMessage());
+ } catch (SchedulerException se) {
+ LOG.log(Level.WARNING,"Error occured scheduling job: "+se.getLocalizedMessage());
+ try {
+ queue.requeueJob(job);
+ } catch (JobQueueException je) {
+ LOG.log(Level.WARNING,"Error requeueing job: "+je.getLocalizedMessage());
+ LOG.log(Level.WARNING,"Previous error caused by: "+se.getLocalizedMessage());
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#schedule(gov.nasa.jpl.oodt.cas.resource.structs.JobSpec)
+ */
+ public synchronized boolean schedule(JobSpec spec)
+ throws SchedulerException {
+ System.out.println("Spec: "+spec+" Job: "+spec.getJob()+" Backend:"+backend);
+ String queue = spec.getJob().getQueueName();
+ try {
+ return backend.getScheduler(queue).schedule(spec);
+ } catch (QueueManagerException e) {
+ LOG.log(Level.WARNING,"Exception occuered: "+e.getLocalizedMessage());
+ throw new SchedulerException(e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getBatchmgr()
+ */
+ public Batchmgr getBatchmgr() {
+ return batch;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getMonitor()
+ */
+ public Monitor getMonitor() {
+ return mon;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getJobQueue()
+ */
+ public JobQueue getJobQueue() {
+ return this.queue;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getQueueManager()
+ */
+ public QueueManager getQueueManager() {
+ return qManager;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#nodeAvailable(gov.nasa.jpl.oodt.cas.resource.structs.JobSpec)
+ */
+ public synchronized ResourceNode nodeAvailable(JobSpec spec)
+ throws SchedulerException {
+ String queue = spec.getJob().getQueueName();
+ try {
+ return backend.getScheduler(queue).nodeAvailable(spec);
+ } catch (QueueManagerException e) {
+ LOG.log(Level.WARNING,"Exception occuered: "+e.getLocalizedMessage());
+ throw new SchedulerException(e);
+ }
+ }
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.mux;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory;
+import org.apache.oodt.cas.resource.queuerepo.XmlQueueRepositoryFactory;
+import org.apache.oodt.cas.resource.scheduler.QueueManager;
+import org.apache.oodt.cas.resource.scheduler.Scheduler;
+import org.apache.oodt.cas.resource.scheduler.SchedulerFactory;
+import org.apache.oodt.cas.resource.structs.exceptions.RepositoryException;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+
+/**
+ * This class acts as a factory for the whole queue-mux
+ * set of classes.
+ *
+ * @author starchmd
+ */
+public class QueueMuxSchedulerFactory implements SchedulerFactory {
+
+ private static final Logger LOG = Logger.getLogger(QueueMuxSchedulerFactory.class.getName());
+
+ BackendManager backend;
+ QueueManager qManager;
+ JobQueue jobQueue;
+ /**
+ * ctor
+ */
+ public QueueMuxSchedulerFactory() {
+ //Load backend manager
+ String backRepo = System.getProperty("resource.backend.mux.repository",
+ XmlBackendRepository.class.getCanonicalName());
+ try {
+ backend = GenericResourceManagerObjectFactory.getBackendRepositoryFromFactory(backRepo).load();
+ } catch (RepositoryException e) {
+ LOG.log(Level.SEVERE,"Error loading backend repository: "+e.getMessage(),e);
+ backend = null;
+ }
+ //Load user-specified queue factory
+ String qFact = System.getProperty("org.apache.oodt.cas.resource.queues.repo.factory",
+ XmlQueueRepositoryFactory.class.getCanonicalName());
+ qManager = GenericResourceManagerObjectFactory.getQueueRepositoryFromFactory(
+ qFact).loadQueues();
+ //Load job queue
+ String jobFact = System.getProperty("resource.jobqueue.factory",
+ JobStackJobQueueFactory.class.getCanonicalName());
+ jobQueue = GenericResourceManagerObjectFactory
+ .getJobQueueServiceFromFactory(jobFact);
+ }
+
+ @Override
+ public Scheduler createScheduler() {
+ return new QueueMuxScheduler(this.backend, this.qManager, this.jobQueue);
+ }
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.mux;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.scheduler.Scheduler;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+/**
+ * This manager keeps track of the mux-able backends for the resource manager.
+ * It effectively maps a queue to the backend that this queue feeds.
+ *
+ * It uses a private BackendSet to keep track of everything.
+ *
+ * For reference, a backend is a set of the following:
+ * 1. Batch manger, responsible for running jobs
+ * 2. Scheduler, responsible for scheduling a job to run
+ * 3. Monitor, responsible for managing nodes
+ *
+ * @author starchmd
+ */
+public class StandardBackendManager implements BackendManager {
+ Map<String,BackendSet> queueToBackend = new HashMap<String,BackendSet>();
+
+ /**
+ * Add in a backend set to this manager.
+ * @param queue - queue that maps to the given monitor, batchmgr, and scheduler
+ * @param monitor - monitor used for this set
+ * @param batchmgr - batch manager for this set
+ * @param scheduler - scheduler for this set
+ */
+ public void addSet(String queue,Monitor monitor, Batchmgr batchmgr, Scheduler scheduler) {
+ queueToBackend.put(queue, new BackendSet(monitor,batchmgr,scheduler));
+ }
+ /**
+ * Return monitor for the given queue.
+ * @param queue - queue to check
+ * @return montior
+ * @throws QueueManagerException when queue does not exist
+ */
+ public Monitor getMonitor(String queue) throws QueueManagerException {
+ BackendSet set = queueToBackend.get(queue);
+ if (set == null)
+ throw new QueueManagerException("Queue '" + queue + "' does not exist");
+ return set.monitor;
+ }
+ /**
+ * Return batch manager for the given queue.
+ * @param queue - queue to check
+ * @return batchmgr
+ * @throws QueueManagerException when queue does not exist
+ */
+ public Batchmgr getBatchmgr(String queue) throws QueueManagerException {
+ BackendSet set = queueToBackend.get(queue);
+ if (set == null)
+ throw new QueueManagerException("Queue '" + queue + "' does not exist");
+ return set.batchmgr;
+ }
+ /**
+ * Return scheduler for the given queue.
+ * @param queue - queue to check
+ * @return scheduler
+ * @throws QueueManagerException when queue does not exist
+ */
+ public Scheduler getScheduler(String queue) throws QueueManagerException {
+ BackendSet set = queueToBackend.get(queue);
+ if (set == null)
+ throw new QueueManagerException("Queue '" + queue + "' does not exist");
+ return set.scheduler;
+ }
+ /**
+ * Return a list of all monitors.
+ * @return list of all monitors
+ */
+ public List<Monitor> getMonitors() {
+ List<Monitor> monitors = new LinkedList<Monitor>();
+ for (BackendSet set : queueToBackend.values()) {
+ monitors.add(set.monitor);
+ }
+ return monitors;
+ }
+ /**
+ * Class that holds a set of the three backend pieces.
+ * Private class, because no accessor/modifiers have been
+ * created(public members). Acts like a struct.
+ *
+ * @author starchmd
+ */
+ private class BackendSet {
+ public Monitor monitor = null;
+ public Batchmgr batchmgr = null;
+ public Scheduler scheduler = null;
+
+ public BackendSet(Monitor monitor, Batchmgr batchmgr, Scheduler scheduler) {
+ this.monitor = monitor;
+ this.batchmgr = batchmgr;
+ this.scheduler = scheduler;
+ }
+ }
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.mux;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.resource.scheduler.Scheduler;
+import org.apache.oodt.cas.resource.structs.exceptions.RepositoryException;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.commons.xml.XMLUtils;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * Class to load BackendManager from XML file.
+ * @author starchmd
+ */
+public class XmlBackendRepository implements BackendRepository {
+
+ private static final Logger LOG = Logger.getLogger(XmlBackendRepository.class.getName());
+ private String uri;
+
+ //Constants
+ private static final String SCHEDULER = "scheduler";
+ private static final String BATCHMGR = "batchmgr";
+ private static final String MONITOR = "monitor";
+
+ private static final String MONITOR_PROPERTY = "resource.monitor.factory";
+ private static final String BATCHMGR_PROPERTY = "resource.batchmgr.factory";
+
+ /**
+ * Ctor
+ * @param uri - uri of XML file containing mapping
+ */
+ public XmlBackendRepository(String uri) {
+ if (uri == null)
+ throw new NullPointerException("URI for queue-to-backend xml file cannot be null");
+ this.uri = uri;
+ }
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.mux.BackendRepository#load()
+ */
+ @Override
+ public BackendManager load() throws RepositoryException {
+ LOG.log(Level.INFO,"Reading backend set manager from: "+this.uri);
+ BackendManager bm = new StandardBackendManager();
+ String origMon = System.getProperty(MONITOR_PROPERTY);
+ String origBat = System.getProperty(BATCHMGR_PROPERTY);
+ try {
+ File file = new File(new URI(this.uri));
+ Document root = XMLUtils.getDocumentRoot(new FileInputStream(file));
+ NodeList list = root.getElementsByTagName("queue");
+ if (list != null && list.getLength() > 0) {
+ for (int k = 0; k < list.getLength(); k++) {
+ Element node = (Element)list.item(k);
+ String queue = node.getAttribute("name");
+ //Set properties for batch and monitor factories
+ //So scheduler builds as repository specifies
+ try {
+ String mfact = getMonitor(queue,node);
+ LOG.log(Level.INFO,"Setting monitor factory property to: "+mfact);
+ System.setProperty(MONITOR_PROPERTY, mfact);
+ } catch (RepositoryException e) {
+ LOG.log(Level.INFO, "No monitor factory for queue "+queue+", using system property.");
+ }
+ try {
+ String bfact = getBatchmgr(queue,node);
+ LOG.log(Level.INFO,"Setting batchmgr factory property to: "+bfact);
+ System.setProperty(BATCHMGR_PROPERTY, bfact);
+ } catch (RepositoryException e) {
+ LOG.log(Level.INFO, "No batchmgr factory for queue "+queue+", using system property.");
+ }
+ //Build scheduler
+ Scheduler sch = getScheduler(queue,node);
+ bm.addSet(queue, sch.getMonitor(), sch.getBatchmgr(), sch);
+ //Reset Properties for next item
+ resetAlteredProperty(MONITOR_PROPERTY,origMon);
+ resetAlteredProperty(BATCHMGR_PROPERTY,origBat);
+ }
+ }
+ } catch (URISyntaxException e) {
+ LOG.log(Level.SEVERE,"Malformed URI: "+this.uri);
+ throw new RepositoryException(e);
+ } catch(FileNotFoundException e) {
+ LOG.log(Level.SEVERE,"File not found: "+this.uri+" from working dir: "+new File(".").getAbsolutePath());
+ throw new RepositoryException(e);
+ } catch (ClassCastException e) {
+ LOG.log(Level.SEVERE,"Queue tag must represent XML element.");
+ throw new RepositoryException(e);
+ } finally {
+ resetAlteredProperty(MONITOR_PROPERTY,origMon);
+ resetAlteredProperty(BATCHMGR_PROPERTY,origBat);
+ }
+
+ return bm;
+ }
+ /**
+ * Resets a property. Allows nulls
+ * @param prop - property name to reset
+ * @param value - value to reset to, can be null
+ */
+ private static void resetAlteredProperty(String prop,String value) {
+ if (value == null) {
+ System.clearProperty(prop);
+ return;
+ }
+ System.setProperty(prop,value);
+ }
+
+ /**
+ * Get monitor factory from XML
+ * @param queue - current queue, for error reporting
+ * @param node - node that is being read
+ * @return monitor factory string
+ * @throws RepositoryException
+ */
+ private static String getMonitor(String queue,Element node) throws RepositoryException {
+ return getFactoryAttribute(queue, node, MONITOR);
+ }
+ /**
+ * Get scheduler from XML
+ * @param queue - current queue, for error reporting
+ * @param node - node that is being read
+ * @return newly constructed Scheduler
+ * @throws RepositoryException
+ */
+ private static Scheduler getScheduler(String queue,Element node) throws RepositoryException {
+ String factory = getFactoryAttribute(queue, node, SCHEDULER);
+ LOG.log(Level.INFO,"Loading monitor from: "+factory);
+ Scheduler sch = GenericResourceManagerObjectFactory.getSchedulerServiceFromFactory(factory);
+ if (sch != null)
+ return sch;
+ throw new RepositoryException("Could instantiate from: "+factory);
+ }
+ /**
+ * Get batchmgr factory from XML
+ * @param queue - current queue, for error reporting
+ * @param node - node that is being read
+ * @return batch manager factory name
+ * @throws RepositoryException
+ */
+ private static String getBatchmgr(String queue,Element node) throws RepositoryException {
+ return getFactoryAttribute(queue, node, BATCHMGR);
+ }
+ /**
+ * Pull out the factory attribute from tag with given name.
+ * @param queue - current queue, for error reporting
+ * @param elem - element that contains tags as children
+ * @param tag - string name of tag looked for. i.e. "monitor"
+ * @return name of factory class
+ * @throws RepositoryException - thrown if more than one child matches, no children match, or other error
+ */
+ private static String getFactoryAttribute(String queue,Element elem, String tag) throws RepositoryException {
+ NodeList children = elem.getElementsByTagName(tag);
+ try {
+ String attr = "";
+ if (children.getLength() != 1 || (attr = ((Element)children.item(0)).getAttribute("factory")) == "") {
+ throw new RepositoryException("Could not find exactly one "+tag+", with factory set, in queue: "+queue);
+ }
+ return attr;
+ } catch (ClassCastException e) {
+ throw new RepositoryException("Tag "+tag+" does not represent XML element in queue: "+queue,e);
+ }
+ }
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.mux;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.util.PathUtils;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * The XML Backend Repository Factory interface.
+ * </p>
+ */
+public class XmlBackendRepositoryFactory implements BackendRepositoryFactory {
+
+ private static final Logger LOG = Logger.getLogger(XmlBackendRepositoryFactory.class.getName());
+ /**
+ * Create the backend repository (xml)
+ * @return the newly minted backend repository
+ */
+ public XmlBackendRepository createBackendRepository() {
+ try {
+ String uri = System.getProperty("resource.backend.mux.xmlrepository.queuetobackend");
+ /* do env var replacement */
+ uri = PathUtils.replaceEnvVariables(uri);
+ return new XmlBackendRepository(uri);
+ } catch (NullPointerException e) {
+ LOG.log( Level.SEVERE,"Failed to create XmlBackendRepository: "+ e.getMessage(), e);
+ return null;
+ }
+ }
+
+}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.structs.exceptions;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * An exception thrown by the {@link BackendRepository} when an error occurs.
+ * </p>
+ */
+public class RepositoryException extends Exception {
+
+ /* serial version UID */
+ private static final long serialVersionUID = 4568261126290589269L;
+
+ /**
+ *
+ */
+ public RepositoryException() {}
+
+ /**
+ * @param message
+ */
+ public RepositoryException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param cause
+ */
+ public RepositoryException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param message
+ * @param cause
+ */
+ public RepositoryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
Modified: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java?rev=1634142&r1=1634141&r2=1634142&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java (original)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java Fri Oct 24 21:38:01 2014
@@ -33,6 +33,8 @@ import org.apache.oodt.cas.resource.moni
import org.apache.oodt.cas.resource.monitor.MonitorFactory;
import org.apache.oodt.cas.resource.monitor.ganglia.loadcalc.LoadCalculator;
import org.apache.oodt.cas.resource.monitor.ganglia.loadcalc.LoadCalculatorFactory;
+import org.apache.oodt.cas.resource.mux.BackendRepository;
+import org.apache.oodt.cas.resource.mux.BackendRepositoryFactory;
import org.apache.oodt.cas.resource.noderepo.NodeRepository;
import org.apache.oodt.cas.resource.noderepo.NodeRepositoryFactory;
import org.apache.oodt.cas.resource.queuerepo.QueueRepository;
@@ -162,7 +164,42 @@ public final class GenericResourceManage
return null;
}
+ /**
+ * Creates a new {@link BackendRepository} implementation from the given
+ * {@link BackendRepositoryFactory} class name.
+ *
+ * @param backendRepositoryFactory
+ * The class name of the {@link BackendRepositoryFactory} to use to create new
+ * {@link BackendRepository}s.
+ * @return A new implementation of a {@link BackendRepository}.
+ */
+ public static BackendRepository getBackendRepositoryFromFactory(String backendRepositoryFactory) {
+ Class clazz = null;
+ BackendRepositoryFactory factory = null;
+
+ try {
+ clazz = Class.forName(backendRepositoryFactory);
+ factory = (BackendRepositoryFactory) clazz.newInstance();
+ return factory.createBackendRepository();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING,
+ "ClassNotFoundException when loading backend repository factory class "
+ + backendRepositoryFactory + " Message: " + e.getMessage());
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING,
+ "InstantiationException when loading backend repository factory class "
+ + backendRepositoryFactory + " Message: " + e.getMessage());
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING,
+ "IllegalAccessException when loading backend repository factory class "
+ + backendRepositoryFactory + " Message: " + e.getMessage());
+ }
+ return null;
+ }
/**
* Creates a new {@link NodeRepository} implementation from the given
* {@link QueueRepositoryFactory} class name.
Added: oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml (added)
+++ oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml Fri Oct 24 21:38:01 2014
@@ -0,0 +1,24 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more contributor
+license agreements. See the NOTICE.txt file distributed with this work for
+additional information regarding copyright ownership. The ASF licenses this
+file to you under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.
+-->
+<cas:queue-to-backend-mapping xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas">
+ <queue name="example">
+ <scheduler factory="org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory"/>
+ <monitor factory="org.apache.oodt.cas.resource.monitor.AssignmentMonitorFactory"/>
+ <batchmgr factory="org.apache.oodt.cas.resource.batchmgr.XmlRpcBatchMgrFactory"/>
+ </queue>
+</cas:queue-to-backend-mapping>
Modified: oodt/trunk/resource/src/main/resources/resource.properties
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/resource.properties?rev=1634142&r1=1634141&r2=1634142&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/resources/resource.properties (original)
+++ oodt/trunk/resource/src/main/resources/resource.properties Fri Oct 24 21:38:01 2014
@@ -31,6 +31,11 @@ resource.jobqueue.factory = org.apache.o
# resource job repository factory
resource.jobrepo.factory = org.apache.oodt.cas.resource.jobrepo.MemoryJobRepositoryFactory
+# For queue-multiplexing scheduler
+resource.backend.mux.repository = org.apache.oodt.cas.resource.mux.XmlBackendRepositoryFactory
+resource.backend.mux.xmlrepository.queuetobackend = file://[HOME]/queue-to-backend.xml
+
+
# node repository factory
org.apache.oodt.cas.resource.nodes.repo.factory = org.apache.oodt.cas.resource.noderepo.XmlNodeRepositoryFactory
Added: oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java (added)
+++ oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.resource.mux;
+
+//OODT imports
+import org.apache.oodt.cas.resource.mux.mocks.MockBatchManager;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+
+//JUnit imports
+import junit.framework.TestCase;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * Test Suite for the {@link QueueBatchMonitor} service
+ * </p>.
+ */
+public class TestQueueMuxBatchmgr extends TestCase {
+
+ private QueueMuxBatchManager queue;
+ private MockBatchManager mock1;
+ private MockBatchManager mock2;
+
+ protected void setUp() {
+ BackendManager back = new StandardBackendManager();
+ back.addSet("queue-1", null,(mock1 = new MockBatchManager()), null);
+ back.addSet("queue-2", null,(mock2 = new MockBatchManager()), null);
+ queue = new QueueMuxBatchManager(back);
+ }
+
+ public void testExecuteRemotely() {
+ try {
+
+ //Test that the jobs are put in seperate mock-backends based on queues
+ ResourceNode node1 = new ResourceNode();
+ ResourceNode node2 = new ResourceNode();
+
+ JobSpec spec1 = this.getSpecFromQueue("queue-1");
+ queue.executeRemotely(spec1, node1);
+
+ JobSpec spec2 = this.getSpecFromQueue("queue-2");
+ queue.executeRemotely(spec2, node2);
+ //Yes...use reference equality, as these must be the exact same object
+ TestCase.assertEquals(spec1,mock1.getCurrentJobSpec());
+ TestCase.assertEquals(spec2,mock2.getCurrentJobSpec());
+ TestCase.assertEquals(node1,mock1.getCurrentResourceNode());
+ TestCase.assertEquals(node2,mock2.getCurrentResourceNode());
+ //Throws exception on bad queue
+ try {
+ queue.executeRemotely(this.getSpecFromQueue("queue-3"),node1);
+ TestCase.fail("Failed to throw JobExecutionException on unknown queue.");
+ } catch(JobExecutionException e) {}
+ } catch (JobExecutionException e) {
+ TestCase.fail("Unexpected Exception: "+e.getMessage());
+ }
+ }
+
+ public void testKillJob() {
+ try {
+ ResourceNode node1 = new ResourceNode();
+ ResourceNode node2 = new ResourceNode();
+
+ JobSpec spec1 = this.getSpecFromQueue("queue-1");
+ queue.executeRemotely(spec1, node1);
+
+ JobSpec spec2 = this.getSpecFromQueue("queue-2");
+ queue.executeRemotely(spec2, node2);
+ //Make sure that one can kill a job, and the other job is running
+ TestCase.assertTrue(queue.killJob(spec1.getJob().getId(), node1));
+ TestCase.assertEquals(mock1.getCurrentJobSpec(),null);
+ TestCase.assertEquals(mock2.getCurrentJobSpec(),spec2);
+ //Make sure kill fails with bad queue
+ TestCase.assertFalse(queue.killJob(this.getSpecFromQueue("queue-3").getJob().getId(), node1));
+ } catch (JobExecutionException e) {
+ TestCase.fail("Unexpected Exception: "+e.getMessage());
+ }
+ }
+
+ public void testGetExecNode() {
+ try {
+ ResourceNode node1 = new ResourceNode();
+ ResourceNode node2 = new ResourceNode();
+ node1.setId("Node1-ID");
+ node2.setId("Node2-ID");
+ JobSpec spec1 = this.getSpecFromQueue("queue-1");
+ queue.executeRemotely(spec1, node1);
+
+ JobSpec spec2 = this.getSpecFromQueue("queue-2");
+ queue.executeRemotely(spec2, node2);
+ //Make that the execution node is same
+ TestCase.assertEquals(node1.getNodeId(),queue.getExecutionNode(spec1.getJob().getId()));
+ TestCase.assertEquals(node2.getNodeId(),queue.getExecutionNode(spec2.getJob().getId()));
+ //Returns null, if bad-queue
+ TestCase.assertNull(queue.getExecutionNode(this.getSpecFromQueue("queue-3").getJob().getId()));
+ } catch (JobExecutionException e) {
+ TestCase.fail("Unexpected Exception: "+e.getMessage());
+ }
+ }
+
+ private JobSpec getSpecFromQueue(String queue) {
+ JobSpec spec1 = new JobSpec();
+ Job job1 = new Job();
+ job1.setId("000000100000011-"+queue);
+ job1.setQueueName(queue);
+ spec1.setJob(job1);
+ return spec1;
+ }
+}
Added: oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java (added)
+++ oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.mux;
+
+//OODT imports
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oodt.cas.resource.mux.mocks.MockMonitor;
+import org.apache.oodt.cas.resource.scheduler.QueueManager;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+//JUnit imports
+import junit.framework.TestCase;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * Test Suite for the {@link QueueBatchMonitor} service
+ * </p>.
+ */
+public class TestQueueMuxMonitor extends TestCase {
+
+ private QueueMuxMonitor monitor;
+ private MockMonitor mock1;
+ private MockMonitor mock2;
+ private ResourceNode superfluous;
+ private QueueManager qm;
+ Map<MockMonitor,List<ResourceNode>> map;
+
+ protected void setUp() {
+ try {
+ //Map monitor to nodes list
+ map = new HashMap<MockMonitor,List<ResourceNode>>();
+ List<ResourceNode> nodes1 = getNodesList("mock-1");
+ List<ResourceNode> nodes2 = getNodesList("mock-2");
+ //Backend Manager setup
+ BackendManager back = new StandardBackendManager();
+ back.addSet("queue-1",(mock1 = addMonitor(0,map,nodes1)), null, null);
+ back.addSet("queue-2",(mock2 = addMonitor(5,map,nodes2)), null, null);
+ //Make sure the queue manager is setup
+ qm = new QueueManager();
+ qm.addQueue("queue-1");
+ qm.addQueue("queue-2");
+ qm.addQueue("queue-3");
+ for (ResourceNode rn : nodes1)
+ qm.addNodeToQueue(rn.getNodeId(), "queue-1");
+ for (ResourceNode rn : nodes2)
+ qm.addNodeToQueue(rn.getNodeId(), "queue-2");
+ //Add an extra node to test "unknown queue"
+ qm.addNodeToQueue((superfluous = new ResourceNode("superfluous-1",new URL("http://superfluous-1"),-2)).getNodeId(), "queue-3");
+ monitor = new QueueMuxMonitor(back, qm);
+ } catch (QueueManagerException e) {
+ TestCase.fail("Unanticipated queue manager exception caught: "+e.getMessage());
+ } catch (MalformedURLException e) {
+ TestCase.fail("Unanticipated URL exception caught: "+e.getMessage());
+ }
+ }
+
+ public void testGetLoad() {
+ try {
+ TestCase.assertEquals(mock1.load,monitor.getLoad(map.get(mock1).get(0)));
+ TestCase.assertEquals(mock2.load,monitor.getLoad(map.get(mock2).get(0)));
+
+ /*try {
+ monitor.getLoad(superfluous);
+ TestCase.fail("Exception not thrown for unknown queue.");
+ } catch (MonitorException e) {
+ }*/
+ } catch(MonitorException e) {
+ TestCase.fail("Unanticipated monitor exception caught: "+e.getMessage());
+ }
+ }
+
+ public void testGetNodes() {
+ try {
+ List<ResourceNode> nodes = monitor.getNodes();
+ for (ResourceNode rn :map.get(mock1))
+ TestCase.assertTrue("Node: "+rn.getNodeId()+ " not found.", nodes.contains(rn));
+ for (ResourceNode rn :map.get(mock2))
+ TestCase.assertTrue("Node: "+rn.getNodeId()+ " not found.", nodes.contains(rn));
+ } catch(MonitorException e) {
+ TestCase.fail("Unanticipated monitor exception caught: "+e.getMessage());
+ }
+ }
+
+ public void testGetNodeById() {
+ try {
+ TestCase.assertEquals(map.get(mock1).get(0),monitor.getNodeById("mock-1-1"));
+ TestCase.assertEquals(map.get(mock2).get(0),monitor.getNodeById("mock-2-1"));
+ } catch(MonitorException e) {
+ TestCase.fail("Unanticipated monitor exception caught: "+e.getMessage());
+ }
+ }
+ public void testGetNodeByURL() {
+ try {
+ TestCase.assertEquals(map.get(mock1).get(1),monitor.getNodeByURL(new URL("http://mock-1-2")));
+ TestCase.assertEquals(map.get(mock2).get(1),monitor.getNodeByURL(new URL("http://mock-2-2")));
+ } catch(MonitorException e) {
+ TestCase.fail("Unanticipated monitor exception caught: "+e.getMessage());
+ } catch (MalformedURLException e1) {
+ TestCase.fail("Unanticipated URL exception caught: "+e1.getMessage());
+ }
+ }
+
+ public void testReduceLoad() {
+ try {
+ TestCase.assertTrue(monitor.reduceLoad(map.get(mock1).get(2), 5));
+ TestCase.assertTrue(monitor.reduceLoad(map.get(mock2).get(2), 3));
+ TestCase.assertEquals(map.get(mock1).get(2).getCapacity(),25);
+ TestCase.assertEquals(map.get(mock2).get(2).getCapacity(),27);
+ try {
+ monitor.reduceLoad(superfluous, 2);
+ TestCase.fail("Exception not thrown for unknown queue.");
+ } catch (MonitorException e) {}
+ } catch(MonitorException e) {
+ TestCase.fail("Unanticipated monitor exception caught: "+e.getMessage());
+ }
+ }
+
+ public void testAssignLoad() {
+ try {
+ TestCase.assertTrue(monitor.assignLoad(map.get(mock1).get(2), 5));
+ TestCase.assertTrue(monitor.assignLoad(map.get(mock2).get(2), 3));
+ TestCase.assertEquals(map.get(mock1).get(2).getCapacity(),5);
+ TestCase.assertEquals(map.get(mock2).get(2).getCapacity(),3);
+ try {
+ monitor.assignLoad(superfluous, 2);
+ TestCase.fail("Exception not thrown for unknown queue.");
+ } catch (MonitorException e) {}
+ } catch(MonitorException e) {
+ TestCase.fail("Unanticipated monitor exception caught: "+e.getMessage());
+ }
+ }
+
+ public void testAddNode() {
+ try {
+ ResourceNode node = new ResourceNode("a-new-node",null,2);
+ qm.addNodeToQueue(node.getNodeId(), "queue-1");
+ monitor.addNode(node);
+ TestCase.assertEquals(node,mock1.getAdded());
+ } catch(MonitorException e) {
+ TestCase.fail("Unanticipated monitor exception caught: "+e.getMessage());
+ } catch (QueueManagerException e1) {
+ TestCase.fail("Unanticipated queue manager exception caught: "+e1.getMessage());
+ }
+ }
+ public void removeNodeById() {
+ try {
+ ResourceNode node = new ResourceNode("a-new-node",null,2);
+ qm.addNodeToQueue(node.getNodeId(), "queue-1");
+ monitor.addNode(node);
+ TestCase.assertEquals(node,mock1.getAdded());
+ monitor.removeNodeById(node.getNodeId());
+ TestCase.assertEquals(null,mock1.getAdded());
+ } catch(MonitorException e) {
+ TestCase.fail("Unanticipated monitor exception caught: "+e.getMessage());
+ } catch (QueueManagerException e1) {
+ TestCase.fail("Unanticipated queue manager exception caught: "+e1.getMessage());
+ }
+ }
+
+ private MockMonitor addMonitor(int load,Map<MockMonitor, List<ResourceNode>> map, List<ResourceNode> list) {
+ MockMonitor mon = new MockMonitor(load, list, list.get(0), list.get(1), list.get(2));
+ map.put(mon, list);
+ return mon;
+ }
+ private List<ResourceNode> getNodesList(String prefix) {
+ List<ResourceNode> nodes = new LinkedList<ResourceNode>();
+ try {
+ nodes.add(new ResourceNode(prefix+"-1",new URL("http://"+prefix+"-1"),10));
+ nodes.add(new ResourceNode(prefix+"-2",new URL("http://"+prefix+"-2"),20));
+ nodes.add(new ResourceNode(prefix+"-3",new URL("http://"+prefix+"-3"),30));
+ nodes.add(new ResourceNode(prefix+"-4",new URL("http://"+prefix+"-4"),40));
+ } catch (MalformedURLException e) {
+ TestCase.fail("Unanticipated URL exception caught: "+e.getMessage());
+ }
+ return nodes;
+ }
+}
Added: oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java (added)
+++ oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.mux.mocks;
+
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+/**
+ * This is a mock version of the batch manager. It SHOULD NOT, and
+ * CAN NOT be used as a normal class.
+ *
+ * @author starchmd
+ */
+public class MockBatchManager implements Batchmgr {
+
+ private JobSpec execJobSpec;
+ private ResourceNode execResNode;
+
+ @Override
+ public boolean executeRemotely(JobSpec job, ResourceNode resNode)
+ throws JobExecutionException {
+ this.execJobSpec = job;
+ this.execResNode = resNode;
+ return true;
+ }
+
+ @Override
+ public void setMonitor(Monitor monitor) {}
+
+ @Override
+ public void setJobRepository(JobRepository repository) {}
+
+ @Override
+ public boolean killJob(String jobId, ResourceNode node) {
+ if (this.execJobSpec.getJob().getId().equals(jobId))
+ {
+ this.execJobSpec = null;
+ this.execResNode = null;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String getExecutionNode(String jobId) {
+ return execResNode.getNodeId();
+ }
+ /*****
+ * The following are test methods to report what jobs are here.
+ *****/
+ /**
+ * Return the current jobspec, for testing purposes
+ * @return
+ */
+ public JobSpec getCurrentJobSpec() {
+ return this.execJobSpec;
+ }
+ /**
+ * Return the current resource node, for testing purposes
+ * @return
+ */
+ public ResourceNode getCurrentResourceNode() {
+ return execResNode;
+ }
+}
Added: oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java?rev=1634142&view=auto
==============================================================================
--- oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java (added)
+++ oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java Fri Oct 24 21:38:01 2014
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.mux.mocks;
+
+import java.net.URL;
+import java.util.List;
+
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+
+public class MockMonitor implements Monitor {
+
+ public int load = -1;
+ List<ResourceNode> nodes;
+ ResourceNode id;
+ ResourceNode url;
+ ResourceNode add;
+ ResourceNode reduce;
+
+ public MockMonitor(int load,List<ResourceNode> nodes, ResourceNode id, ResourceNode url, ResourceNode reduce) {
+ this.load = load;
+ this.nodes = nodes;
+ this.id = id;
+ this.url = url;
+ this.reduce = reduce;
+ }
+
+ @Override
+ public int getLoad(ResourceNode node) throws MonitorException {
+ return load;
+ }
+ @Override
+ public List getNodes() throws MonitorException {
+ return nodes;
+ }
+
+ @Override
+ public ResourceNode getNodeById(String nodeId) throws MonitorException {
+ return id.getNodeId().equals(nodeId)?id:null;
+ }
+
+ @Override
+ public ResourceNode getNodeByURL(URL ipAddr) throws MonitorException {
+ return url.getIpAddr().equals(ipAddr)?url:null;
+ }
+
+ @Override
+ public boolean reduceLoad(ResourceNode node, int loadValue)
+ throws MonitorException {
+ reduce.setCapacity(reduce.getCapacity() - loadValue);
+ return true;
+ }
+
+ @Override
+ public boolean assignLoad(ResourceNode node, int loadValue)
+ throws MonitorException {
+ reduce.setCapacity(loadValue);
+ return true;
+ }
+
+ @Override
+ public void addNode(ResourceNode node) throws MonitorException {
+ this.add = node;
+
+ }
+
+ @Override
+ public void removeNodeById(String nodeId) throws MonitorException {
+ if (this.add.getNodeId().equals(nodeId))
+ this.add = null;
+ }
+
+ public ResourceNode getAdded() {
+ return this.add;
+ }
+}