You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2010/12/13 18:28:17 UTC
svn commit: r1045244 - in /oodt/trunk: ./
resource/src/main/java/org/apache/oodt/cas/resource/noderepo/
resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/
resource/src/main/java/org/apache/oodt/cas/resource/scheduler/
resource/src/main/java...
Author: bfoster
Date: Mon Dec 13 17:28:16 2010
New Revision: 1045244
URL: http://svn.apache.org/viewvc?rev=1045244&view=rev
Log:
- resource manager is now Queue aware
----------------------
OODT-77
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepository.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepositoryFactory.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepository.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepositoryFactory.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepository.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepositoryFactory.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepository.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepositoryFactory.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUQueueManager.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/QueueManager.java (with props)
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/QueueManagerException.java (with props)
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/queuerepo/
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/queuerepo/TestXmlQueueRepository.java (with props)
Modified:
oodt/trunk/CHANGES.txt
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUScheduler.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUSchedulerFactory.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java
oodt/trunk/resource/src/main/resources/resource.properties
Modified: oodt/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1045244&r1=1045243&r2=1045244&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Mon Dec 13 17:28:16 2010
@@ -4,6 +4,8 @@ Apache OODT Change Log
Release 0.2 (Current Development)
--------------------------------------------
+* OODT-77 Make resource manager Queue aware (bfoster)
+
* OODT-80 Create Cached JobRepository for cas-resource (bfoster)
* OODT-82 Make resource manager's node ip addresses envReplace-able (bfoster)
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepository.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepository.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepository.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.noderepo;
+
+//OODT imports
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+
+//JDK imports
+import java.util.List;
+
+/**
+ *
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The Node Repository interface.
+ * </p>
+ */
+public interface NodeRepository {
+
+ public List<ResourceNode> loadNodes();
+
+}
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepository.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepositoryFactory.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepositoryFactory.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepositoryFactory.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.noderepo;
+
+/**
+ *
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The Node Repository Factory interface.
+ * </p>
+ */
+public interface NodeRepositoryFactory {
+
+ /**
+ * Creates {@link NodeRepository}
+ * @return the created {@link NodeRepository}
+ */
+ public NodeRepository createNodeRepository();
+
+}
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/NodeRepositoryFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepository.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepository.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepository.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.noderepo;
+
+//OODT imports
+import org.apache.oodt.commons.xml.XMLUtils;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.util.XmlStructFactory;
+
+//JDK imports
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//DOM imports
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ *
+ * @author woollard
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The XML Node Repository interface.
+ * </p>
+ */
+public class XmlNodeRepository implements NodeRepository {
+
+ private static final Logger LOG = Logger.getLogger(XmlNodeRepository.class
+ .getName());
+
+ private static FileFilter nodesXmlFilter = new FileFilter() {
+ public boolean accept(File pathname) {
+ return pathname.isFile()
+ && pathname.toString().endsWith("nodes.xml");
+ }
+ };
+
+ private List<String> nodesHomeUris = null;
+
+ public XmlNodeRepository(List<String> uris) {
+ this.nodesHomeUris = uris;
+ }
+
+ public List<ResourceNode> loadNodes() {
+ Vector<ResourceNode> nodes = new Vector<ResourceNode>();
+ for (String dirUri : this.nodesHomeUris) {
+ try {
+ File nodesDir = new File(new URI(dirUri));
+ if (nodesDir.isDirectory()) {
+
+ String nodesDirStr = nodesDir.getAbsolutePath();
+
+ if (!nodesDirStr.endsWith("/")) {
+ nodesDirStr += "/";
+ }
+
+ // get all the workflow xml files
+ File[] nodesFiles = nodesDir.listFiles(nodesXmlFilter);
+
+ for (int j = 0; j < nodesFiles.length; j++) {
+
+ String nodesXmlFile = nodesFiles[j].getAbsolutePath();
+ Document nodesRoot = null;
+ try {
+ nodesRoot = XMLUtils
+ .getDocumentRoot(new FileInputStream(
+ nodesFiles[j]));
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ NodeList nodeList = nodesRoot
+ .getElementsByTagName("node");
+ if (nodeList != null)
+ for (int k = 0; k < nodeList.getLength(); k++)
+ nodes.add(XmlStructFactory
+ .getNodes((Element) nodeList.item(k)));
+ }
+ }
+ } catch (URISyntaxException e) {
+ LOG.log(Level.WARNING, "DirUri: " + dirUri
+ + " is not a directory: skipping node loading for it.",
+ e);
+ }
+ }
+ return nodes;
+ }
+
+}
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepository.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepositoryFactory.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepositoryFactory.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepositoryFactory.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.noderepo;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.util.PathUtils;
+
+//JDK imports
+import java.util.Arrays;
+
+/**
+ *
+ * @author woollard
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The XML Node Repository Factory interface.
+ * </p>
+ */
+public class XmlNodeRepositoryFactory implements NodeRepositoryFactory {
+
+ /*
+ * (non-Javadoc)
+ * @see gov.nasa.jpl.oodt.cas.resource.nodes.NodeRepositoryFactory#createNodeRepository()
+ */
+ public XmlNodeRepository createNodeRepository() {
+ String nodesDirUris = System
+ .getProperty("org.apache.oodt.cas.resource.nodes.dirs");
+
+ if (nodesDirUris != null) {
+ /* do env var replacement */
+ nodesDirUris = PathUtils.replaceEnvVariables(nodesDirUris);
+ String[] dirUris = nodesDirUris.split(",");
+ return new XmlNodeRepository(Arrays.asList(dirUris));
+ }else {
+ return null;
+ }
+ }
+
+}
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/noderepo/XmlNodeRepositoryFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepository.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepository.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepository.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.queuerepo;
+
+//OODT imports
+import org.apache.oodt.cas.resource.scheduler.QueueManager;
+
+/**
+ *
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The Queue Repository interface.
+ * </p>
+ */
+public interface QueueRepository {
+
+ public QueueManager loadQueues();
+
+}
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepository.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepositoryFactory.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepositoryFactory.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepositoryFactory.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.queuerepo;
+
+/**
+ *
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The Queue Repository Factory interface.
+ * </p>
+ */
+public interface QueueRepositoryFactory {
+
+ public QueueRepository createQueueRepository();
+
+}
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/QueueRepositoryFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepository.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepository.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepository.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.queuerepo;
+
+//OODT imports
+import org.apache.oodt.commons.xml.XMLUtils;
+import org.apache.oodt.cas.resource.scheduler.QueueManager;
+import org.apache.oodt.cas.resource.util.XmlStructFactory;
+
+//JDK imports
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//DOM imports
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ *
+ * @author woollard
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The XML Queue Repository interface.
+ * </p>
+ */
+public class XmlQueueRepository implements QueueRepository {
+
+ private static FileFilter queuesXmlFilter = new FileFilter() {
+ public boolean accept(File pathname) {
+ return pathname.isFile()
+ && pathname.toString()
+ .endsWith("node-to-queue-mapping.xml");
+ }
+ };
+
+ private static final Logger LOG = Logger.getLogger(XmlQueueRepository.class
+ .getName());
+
+ private List<String> dirUris;
+
+ public XmlQueueRepository(List<String> dirUris) {
+ this.dirUris = dirUris;
+ }
+
+ public QueueManager loadQueues() {
+
+ QueueManager queueManager = new QueueManager();
+
+ if (dirUris != null && dirUris.size() > 0) {
+ for (Iterator i = dirUris.iterator(); i.hasNext();) {
+ String dirUri = (String) i.next();
+
+ try {
+ File nodesDir = new File(new URI(dirUri));
+ if (nodesDir.isDirectory()) {
+
+ String nodesDirStr = nodesDir.getAbsolutePath();
+
+ if (!nodesDirStr.endsWith("/")) {
+ nodesDirStr += "/";
+ }
+
+ // get all the workflow xml files
+ File[] nodesFiles = nodesDir.listFiles(queuesXmlFilter);
+
+ for (int j = 0; j < nodesFiles.length; j++) {
+
+ String nodesXmlFile = nodesFiles[j]
+ .getAbsolutePath();
+ Document nodesRoot = null;
+ try {
+ nodesRoot = XMLUtils
+ .getDocumentRoot(new FileInputStream(
+ nodesFiles[j]));
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ NodeList nodeList = nodesRoot
+ .getElementsByTagName("node");
+
+ if (nodeList != null && nodeList.getLength() > 0) {
+ for (int k = 0; k < nodeList.getLength(); k++) {
+
+ String nodeId = ((Element) nodeList.item(k))
+ .getAttribute("id");
+ Vector assignments = (Vector) XmlStructFactory
+ .getQueueAssignment((Element) nodeList
+ .item(k));
+ for (int l = 0; l < assignments.size(); l++) {
+ try {
+ // make sure queue exists
+ queueManager
+ .addQueue((String) assignments
+ .get(l));
+ // add node to queue
+ queueManager
+ .addNodeToQueue(nodeId,
+ (String) assignments
+ .get(l));
+ } catch (Exception e) {
+ LOG
+ .log(
+ Level.WARNING,
+ "Failed to add node '"
+ + nodeId
+ + "' to queue '"
+ + (String) assignments
+ .get(l)
+ + "' : "
+ + e
+ .getMessage(),
+ e);
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ LOG
+ .log(
+ Level.WARNING,
+ "DirUri: "
+ + dirUri
+ + " is not a directory: skipping node loading for it.");
+ }
+ }
+
+ }
+ return queueManager;
+ }
+
+}
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepository.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepositoryFactory.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepositoryFactory.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepositoryFactory.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.queuerepo;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.util.PathUtils;
+
+//JDK imports
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * @author woollard
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The XML Queue Repository Factory interface.
+ * </p>
+ */
+public class XmlQueueRepositoryFactory implements QueueRepositoryFactory {
+
+ private static final Logger LOG = Logger
+ .getLogger(XmlQueueRepositoryFactory.class.getName());
+
+ public XmlQueueRepository createQueueRepository() {
+ try {
+ String queuesDirUris = System
+ .getProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs");
+
+ /* do env var replacement */
+ queuesDirUris = PathUtils.replaceEnvVariables(queuesDirUris);
+ String[] dirUris = queuesDirUris.split(",");
+ return new XmlQueueRepository(Arrays.asList(dirUris));
+ } catch (Exception e) {
+ LOG
+ .log(
+ Level.SEVERE,
+ "Failed to create XML Queue Repository (make sure you specify node-to-queue mapping java property 'org.apache.oodt.cas.resource.scheduler.nodetoqueues.dirs') : "
+ + e.getMessage(), e);
+ return null;
+ }
+ }
+
+}
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/queuerepo/XmlQueueRepositoryFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUQueueManager.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUQueueManager.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUQueueManager.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUQueueManager.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.scheduler;
+
+//JDK imports
+import java.util.LinkedHashSet;
+import java.util.Vector;
+
+//OODT imports
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+/**
+ *
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The LRUs Queue to Node Mapping Manager
+ * </p>
+ */
+public class LRUQueueManager extends QueueManager {
+
+ public LRUQueueManager(QueueManager queueManager) throws QueueManagerException {
+ for (String queue : queueManager.getQueues()) {
+ LinkedHashSet<String> nodes = new LinkedHashSet<String>();
+ nodes.addAll(queueManager.getNodes(queue));
+ this.queueToNodesMapping.put(queue, nodes);
+ }
+ }
+
+ public synchronized void usedNode(String queueName, String nodeId) {
+ Vector<String> nodes = new Vector<String>(this.queueToNodesMapping.get(queueName));
+ nodes.remove(nodeId);
+ nodes.add(nodeId);
+ LinkedHashSet<String> nodeSet = new LinkedHashSet<String>();
+ nodeSet.addAll(nodes);
+ this.queueToNodesMapping.put(queueName, nodeSet);
+ }
+
+}
+
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUQueueManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUScheduler.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUScheduler.java?rev=1045244&r1=1045243&r2=1045244&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUScheduler.java (original)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUScheduler.java Mon Dec 13 17:28:16 2010
@@ -20,38 +20,23 @@ package org.apache.oodt.cas.resource.sch
//JDKimports
import java.lang.Integer;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
//OODT imports
import org.apache.oodt.cas.resource.jobqueue.JobQueue;
import org.apache.oodt.cas.resource.monitor.Monitor;
import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
-import org.apache.oodt.commons.xml.XMLUtils;
import org.apache.oodt.cas.resource.structs.JobSpec;
import org.apache.oodt.cas.resource.structs.ResourceNode;
import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
-import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
-import org.apache.oodt.cas.resource.util.XmlStructFactory;
/**
*
* @author woollard
+ * @author bfoster
* @version $Revision$
*
* <p>
@@ -66,12 +51,8 @@ public class LRUScheduler implements Sch
private static final Logger LOG = Logger.getLogger(LRUScheduler.class
.getName());
- /* list of URI pointers to dirs containing node-to-queue-mapping.xml files */
- private List queuesHomeUris = null;
-
- /* a map of String queueId->List of nodeIds */
- private HashMap queues;
-
+ private LRUQueueManager queueManager;
+
/* the monitor we'll use to check the status of the resources */
private Monitor myMonitor;
@@ -84,20 +65,9 @@ public class LRUScheduler implements Sch
/* our wait time between checking the queue */
private int waitTime = -1;
- private static FileFilter queuesXmlFilter = new FileFilter() {
- public boolean accept(File pathname) {
- return pathname.isFile()
- && pathname.toString()
- .endsWith("node-to-queue-mapping.xml");
- }
- };
-
- public LRUScheduler(List uris, Monitor m, Batchmgr b, JobQueue q) {
- queues = new HashMap();
-
- queuesHomeUris = uris;
- loadNodeMappingInfo(queuesHomeUris);
+ public LRUScheduler(Monitor m, Batchmgr b, JobQueue q, LRUQueueManager qm) {
+ queueManager = qm;
myMonitor = m;
myBatchmgr = b;
myJobQueue = q;
@@ -117,8 +87,7 @@ public class LRUScheduler implements Sch
try {
Thread.currentThread().sleep((long) waitTime * 1000);
- } catch (InterruptedException ignore) {
- }
+ } catch (Exception ignore) {}
if (!myJobQueue.isEmpty()) {
JobSpec exec = null;
@@ -128,7 +97,7 @@ public class LRUScheduler implements Sch
LOG.log(Level.INFO, "Obtained Job: ["
+ exec.getJob().getId()
+ "] from Queue: Scheduling for execution");
- } catch (JobQueueException e) {
+ } catch (Exception e) {
LOG.log(Level.WARNING,
"Error getting next job from JobQueue: Message: "
+ e.getMessage());
@@ -137,7 +106,7 @@ public class LRUScheduler implements Sch
try {
schedule(exec);
- } catch (SchedulerException e) {
+ } catch (Exception e) {
LOG.log(Level.WARNING, "Error scheduling job: ["
+ exec.getJob().getId() + "]: Message: "
+ e.getMessage());
@@ -155,21 +124,20 @@ public class LRUScheduler implements Sch
/*
* (non-Javadoc)
*
- * @see org.apache.oodt.cas.resource.scheduler.Scheduler#schedule(org.apache.oodt.cas.resource.structs.JobSpec)
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#schedule(gov.nasa.jpl.oodt.cas.resource.structs.JobSpec)
*/
public synchronized boolean schedule(JobSpec spec)
throws SchedulerException {
String queueName = spec.getJob().getQueueName();
int load = spec.getJob().getLoadValue().intValue();
- Vector queueNodes = (Vector) queues.get(queueName);
ResourceNode node = nodeAvailable(spec);
if (node != null) {
try {
myMonitor.assignLoad(node, load);
- queueNodes.remove(node.getNodeId());
- queueNodes.add(node.getNodeId());
+ queueManager.usedNode(queueName, node.getNodeId());
+
// assign via batch system
LOG.log(Level.INFO, "Assigning job: ["
+ spec.getJob().getName() + "] to node: ["
@@ -211,7 +179,7 @@ public class LRUScheduler implements Sch
/*
* (non-Javadoc)
*
- * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getBatchmgr()
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getBatchmgr()
*/
public Batchmgr getBatchmgr() {
return myBatchmgr;
@@ -220,7 +188,7 @@ public class LRUScheduler implements Sch
/*
* (non-Javadoc)
*
- * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getMonitor()
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getMonitor()
*/
public Monitor getMonitor() {
return myMonitor;
@@ -229,121 +197,56 @@ public class LRUScheduler implements Sch
/*
* (non-Javadoc)
*
- * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getJobQueue()
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getJobQueue()
*/
public JobQueue getJobQueue() {
return myJobQueue;
}
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getQueueManager()
+ */
+ public QueueManager getQueueManager() {
+ return this.queueManager;
+ }
/*
* (non-Javadoc)
*
- * @see org.apache.oodt.cas.resource.scheduler.Scheduler#nodeAvailable(org.apache.oodt.cas.resource.structs.JobSpec)
+ * @see gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#nodeAvailable(gov.nasa.jpl.oodt.cas.resource.structs.JobSpec)
*/
public synchronized ResourceNode nodeAvailable(JobSpec spec)
throws SchedulerException {
- String queueName = spec.getJob().getQueueName();
- int load = spec.getJob().getLoadValue().intValue();
-
- Vector queueNodes = (Vector) queues.get(queueName);
- for (int i = 0; i < queueNodes.size(); i++) {
- String nodeId = (String) queueNodes.get(i);
- int nodeLoad = -1;
- ResourceNode resNode = null;
-
- try {
- resNode = myMonitor.getNodeById(nodeId);
- nodeLoad = myMonitor.getLoad(resNode);
- } catch (MonitorException e) {
- LOG
- .log(Level.WARNING, "Exception getting load on "
- + "node: [" + resNode.getNodeId()
- + "]: Message: " + e.getMessage());
- throw new SchedulerException(e.getMessage());
- }
-
- if (load <= nodeLoad) {
- return resNode;
- }
+ try {
+ String queueName = spec.getJob().getQueueName();
+ int load = spec.getJob().getLoadValue().intValue();
+
+ for (String nodeId : queueManager.getNodes(queueName)) {
+ int nodeLoad = -1;
+ ResourceNode resNode = null;
+
+ try {
+ resNode = myMonitor.getNodeById(nodeId);
+ nodeLoad = myMonitor.getLoad(resNode);
+ } catch (MonitorException e) {
+ LOG
+ .log(Level.WARNING, "Exception getting load on "
+ + "node: [" + resNode.getNodeId()
+ + "]: Message: " + e.getMessage());
+ throw new SchedulerException(e.getMessage());
+ }
+
+ if (load <= nodeLoad) {
+ return resNode;
+ }
+ }
+
+ return null;
+ }catch (Exception e) {
+ throw new SchedulerException("Failed to find available node for job spec : " + e.getMessage(), e);
}
-
- return null;
- }
-
- private HashMap loadNodeMappingInfo(List dirUris) {
-
- HashMap resources = new HashMap();
-
- if (dirUris != null && dirUris.size() > 0) {
- for (Iterator i = dirUris.iterator(); i.hasNext();) {
- String dirUri = (String) i.next();
-
- try {
- File nodesDir = new File(new URI(dirUri));
- if (nodesDir.isDirectory()) {
-
- String nodesDirStr = nodesDir.getAbsolutePath();
-
- if (!nodesDirStr.endsWith("/")) {
- nodesDirStr += "/";
- }
-
- // get all the workflow xml files
- File[] nodesFiles = nodesDir.listFiles(queuesXmlFilter);
-
- for (int j = 0; j < nodesFiles.length; j++) {
-
- String nodesXmlFile = nodesFiles[j]
- .getAbsolutePath();
- Document nodesRoot = null;
- try {
- nodesRoot = XMLUtils
- .getDocumentRoot(new FileInputStream(
- nodesFiles[j]));
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- return null;
- }
-
- NodeList nodeList = nodesRoot
- .getElementsByTagName("node");
-
- if (nodeList != null && nodeList.getLength() > 0) {
- for (int k = 0; k < nodeList.getLength(); k++) {
-
- String nodeId = ((Element) nodeList.item(k))
- .getAttribute("id");
- Vector assignments = (Vector) XmlStructFactory
- .getQueueAssignment((Element) nodeList
- .item(k));
- for (int l = 0; l < assignments.size(); l++) {
- if (!queues
- .containsKey((String) assignments
- .get(l))) {
- queues.put((String) assignments
- .get(l), new Vector());
- }
- ((Vector) queues
- .get((String) assignments
- .get(l))).add(nodeId);
- }
- }
- }
- }
- }
- } catch (URISyntaxException e) {
- e.printStackTrace();
- LOG
- .log(
- Level.WARNING,
- "DirUri: "
- + dirUri
- + " is not a directory: skipping node loading for it.");
- }
- }
- }
-
- return resources;
}
}
Modified: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUSchedulerFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUSchedulerFactory.java?rev=1045244&r1=1045243&r2=1045244&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUSchedulerFactory.java (original)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/LRUSchedulerFactory.java Mon Dec 13 17:28:16 2010
@@ -19,18 +19,19 @@
package org.apache.oodt.cas.resource.scheduler;
//JAVA imports
-import java.util.Arrays;
-import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
//OODT imports
+import org.apache.oodt.cas.resource.queuerepo.XmlQueueRepositoryFactory;
import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
-import org.apache.oodt.cas.metadata.util.PathUtils;
import org.apache.oodt.cas.resource.jobqueue.JobQueue;
import org.apache.oodt.cas.resource.monitor.Monitor;
import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
/**
* @author woollard
+ * @author bfoster
* @version $Revision$
*
* <p>
@@ -40,63 +41,67 @@ import org.apache.oodt.cas.resource.batc
*/
public class LRUSchedulerFactory implements SchedulerFactory {
- /*
- * a list of URIs pointing to directories that have the
- * node-to-queue-mapping.xml files
- */
- private List queuesDirList;
-
- /* our monitor */
- private Monitor mon = null;
-
- /* our batchmgr */
- private Batchmgr batcher = null;
-
- /* our job queue */
- private JobQueue queue = null;
-
- public LRUSchedulerFactory() {
- String queuesDirUris = System
- .getProperty("org.apache.oodt.cas.resource.scheduler.nodetoqueues.dirs");
-
- if (queuesDirUris != null) {
- /* do env var replacement */
- queuesDirUris = PathUtils.replaceEnvVariables(queuesDirUris);
- String[] dirUris = queuesDirUris.split(",");
- queuesDirList = Arrays.asList(dirUris);
- }
-
- String batchmgrClassStr = System.getProperty("resource.batchmgr.factory",
- "org.apache.oodt.cas.resource.batchmgr.XmlRpcBatchmgrFactory");
- String monitorClassStr = System.getProperty("resource.monitor.factory",
- "org.apache.oodt.cas.resource.monitor.AssignmentMonitorFactory");
-
- String jobQueueClassStr = System.getProperty("resource.jobqueue.factory",
- "org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory");
-
- batcher = GenericResourceManagerObjectFactory
- .getBatchmgrServiceFromFactory(batchmgrClassStr);
- mon = GenericResourceManagerObjectFactory
- .getMonitorServiceFromFactory(monitorClassStr);
-
- queue = GenericResourceManagerObjectFactory.getJobQueueServiceFromFactory(
- jobQueueClassStr);
-
- // set the monitor for this batcher
- batcher.setMonitor(mon);
-
- // set the job repo for this batcher
- batcher.setJobRepository(queue.getJobRepository());
-
-
- }
-
- public Scheduler createScheduler() {
- if (queuesDirList != null) {
- return new LRUScheduler(queuesDirList, mon, batcher, queue);
- } else {
- return null;
- }
- }
+ private static final Logger LOG = Logger
+ .getLogger(LRUSchedulerFactory.class.getName());
+
+ /*
+ * a list of URIs pointing to directories that have the
+ * node-to-queue-mapping.xml files
+ */
+ private LRUQueueManager queueManager;
+
+ /* our monitor */
+ private Monitor mon = null;
+
+ /* our batchmgr */
+ private Batchmgr batcher = null;
+
+ /* our job queue */
+ private JobQueue queue = null;
+
+ public LRUSchedulerFactory() {
+ String queueRepoFactoryClassStr = System.getProperty(
+ "org.apache.oodt.cas.resource.queues.repo.factory",
+ XmlQueueRepositoryFactory.class.getCanonicalName());
+ String batchmgrClassStr = System
+ .getProperty("resource.batchmgr.factory",
+ "org.apache.oodt.cas.resource.batchmgr.XmlRpcBatchmgrFactory");
+ String monitorClassStr = System
+ .getProperty("resource.monitor.factory",
+ "org.apache.oodt.cas.resource.monitor.AssignmentMonitorFactory");
+
+ String jobQueueClassStr = System
+ .getProperty("resource.jobqueue.factory",
+ "org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory");
+
+ try {
+ queueManager = new LRUQueueManager(
+ GenericResourceManagerObjectFactory
+ .getQueueRepositoryFromFactory(
+ queueRepoFactoryClassStr).loadQueues());
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to create queue manager : "
+ + e.getMessage(), e);
+ queueManager = null;
+ }
+ batcher = GenericResourceManagerObjectFactory
+ .getBatchmgrServiceFromFactory(batchmgrClassStr);
+ mon = GenericResourceManagerObjectFactory
+ .getMonitorServiceFromFactory(monitorClassStr);
+
+ queue = GenericResourceManagerObjectFactory
+ .getJobQueueServiceFromFactory(jobQueueClassStr);
+
+ // set the monitor for this batcher
+ batcher.setMonitor(mon);
+
+ // set the job repo for this batcher
+ batcher.setJobRepository(queue.getJobRepository());
+
+ }
+
+ public Scheduler createScheduler() {
+ return new LRUScheduler(mon, batcher, queue, queueManager);
+ }
}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/QueueManager.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/QueueManager.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/QueueManager.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/QueueManager.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.scheduler;
+
+//OODT imports
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+//JDK imports
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+/**
+ *
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * The Queue to Node Mapping Manager
+ * </p>
+ */
+public class QueueManager {
+
+ protected Map<String, LinkedHashSet<String>> queueToNodesMapping;
+
+ public QueueManager() {
+ this.queueToNodesMapping = new HashMap<String, LinkedHashSet<String>>();
+ }
+
+ public synchronized boolean containsQueue(String queueName) {
+ return this.queueToNodesMapping.containsKey(queueName);
+ }
+
+ public synchronized void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException {
+ if (queueName == null || !this.queueToNodesMapping.containsKey(queueName))
+ throw new QueueManagerException("Queue '" + queueName + "' does not exist");
+
+ // add node to queue
+ LinkedHashSet<String> nodes = this.queueToNodesMapping.get(queueName);
+ if (nodes == null)
+ nodes = new LinkedHashSet<String>();
+ nodes.add(nodeId);
+
+ // put node list back into map
+ this.queueToNodesMapping.put(queueName, nodes);
+ }
+
+ public synchronized void addQueue(String queueName) throws QueueManagerException {
+ if (queueName != null && !this.queueToNodesMapping.containsKey(queueName))
+ this.queueToNodesMapping.put(queueName, new LinkedHashSet<String>());
+ }
+
+ public synchronized List<String> getNodes(String queueName) throws QueueManagerException {
+ if (queueName != null && this.queueToNodesMapping.containsKey(queueName))
+ return new Vector<String>(this.queueToNodesMapping.get(queueName));
+ else
+ throw new QueueManagerException("Queue '" + queueName + "' does not exist");
+ }
+
+ public synchronized List<String> getQueues() throws QueueManagerException {
+ return new Vector<String>(this.queueToNodesMapping.keySet());
+ }
+
+ public synchronized List<String> getQueues(String nodeId) throws QueueManagerException {
+ Vector<String> queueNames = new Vector<String>();
+ for (String queueName : this.queueToNodesMapping.keySet())
+ if (this.queueToNodesMapping.get(queueName).contains(nodeId))
+ queueNames.add(queueName);
+ return queueNames;
+ }
+
+ public synchronized void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException {
+ if (queueName != null && this.queueToNodesMapping.containsKey(queueName))
+ this.queueToNodesMapping.get(queueName).remove(nodeId);
+ else
+ throw new QueueManagerException("Queue '" + queueName + "' does not exist");
+ }
+
+ public synchronized void removeQueue(String queueName) throws QueueManagerException {
+ if (queueName != null)
+ this.queueToNodesMapping.remove(queueName);
+ }
+
+}
+
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/QueueManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java?rev=1045244&r1=1045243&r2=1045244&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java (original)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java Mon Dec 13 17:28:16 2010
@@ -22,12 +22,14 @@ package org.apache.oodt.cas.resource.sch
import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
import org.apache.oodt.cas.resource.jobqueue.JobQueue;
import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.scheduler.QueueManager;
import org.apache.oodt.cas.resource.structs.JobSpec;
import org.apache.oodt.cas.resource.structs.ResourceNode;
import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
/**
* @author woollard
+ * @author bfoster
* @version $Revision$
*
* <p>
@@ -80,4 +82,11 @@ public interface Scheduler extends Runna
*/
public JobQueue getJobQueue();
+ /**
+ *
+ * @return The underlying {@link QueueManager} used by this
+ * Scheduler.
+ */
+ public QueueManager getQueueManager();
+
}
Added: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/QueueManagerException.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/QueueManagerException.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/QueueManagerException.java (added)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/QueueManagerException.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.structs.exceptions;
+
+/**
+ *
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>An Exception thrown by the {@link QueueManager}</p>
+ *
+ */
+public class QueueManagerException extends Exception {
+
+ private static final long serialVersionUID = 7029919499578416147L;
+
+ public QueueManagerException() {
+ super();
+ }
+
+ public QueueManagerException(String msg) {
+ super(msg);
+ }
+
+ public QueueManagerException(Throwable throwable) {
+ super(throwable);
+ }
+
+ public QueueManagerException(String msg, Throwable throwable) {
+ super(msg, throwable);
+ }
+
+}
+
Propchange: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/QueueManagerException.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java?rev=1045244&r1=1045243&r2=1045244&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java (original)
+++ oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java Mon Dec 13 17:28:16 2010
@@ -31,6 +31,8 @@ import org.apache.oodt.cas.resource.jobr
import org.apache.oodt.cas.resource.jobrepo.JobRepositoryFactory;
import org.apache.oodt.cas.resource.monitor.Monitor;
import org.apache.oodt.cas.resource.monitor.MonitorFactory;
+import org.apache.oodt.cas.resource.queuerepo.QueueRepository;
+import org.apache.oodt.cas.resource.queuerepo.QueueRepositoryFactory;
import org.apache.oodt.cas.resource.scheduler.Scheduler;
import org.apache.oodt.cas.resource.scheduler.SchedulerFactory;
import org.apache.oodt.cas.resource.structs.JobInput;
@@ -38,6 +40,7 @@ import org.apache.oodt.cas.resource.stru
/**
* @author mattmann
+ * @author bfoster
* @version $Revision$
*
* <p>
@@ -120,6 +123,43 @@ public final class GenericResourceManage
}
/**
+ * Creates a new {@link QueueRepository} implementation from the given
+ * {@link QueueRepositoryFactory} class name.
+ *
+ * @param serviceFactory
+ * The class name of the {@link QueueRepositoryFactory} to use to create new
+ * {@link QueueRepository}s.
+ * @return A new implementation of a {@link QueueRepository}.
+ */
+ public static QueueRepository getQueueRepositoryFromFactory(String queueRepositoryFactory) {
+ Class clazz = null;
+ QueueRepositoryFactory factory = null;
+
+ try {
+ clazz = Class.forName(queueRepositoryFactory);
+ factory = (QueueRepositoryFactory) clazz.newInstance();
+ return factory.createQueueRepository();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING,
+ "ClassNotFoundException when loading queue repository factory class "
+ + queueRepositoryFactory + " Message: " + e.getMessage());
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING,
+ "InstantiationException when loading queue repository factory class "
+ + queueRepositoryFactory + " Message: " + e.getMessage());
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING,
+ "IllegalAccessException when loading queue repository factory class "
+ + queueRepositoryFactory + " Message: " + e.getMessage());
+ }
+
+ return null;
+ }
+
+ /**
* Creates a new {@link JobQueue} implementation from the given
* {@link JobQueueFactory} class name.
*
Modified: oodt/trunk/resource/src/main/resources/resource.properties
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/resource.properties?rev=1045244&r1=1045243&r2=1045244&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/resources/resource.properties (original)
+++ oodt/trunk/resource/src/main/resources/resource.properties Mon Dec 13 17:28:16 2010
@@ -31,19 +31,31 @@ resource.jobqueue.factory = org.apache.o
# resource job repository factory
resource.jobrepo.factory = org.apache.oodt.cas.resource.jobrepo.MemoryJobRepositoryFactory
-# XML Assignment Monitor config properties
-org.apache.oodt.cas.resource.monitor.nodes.dirs=file://[HOME]/nodes,file://[HOME]/nodes2
+# node repository factory
+org.apache.oodt.cas.resource.nodes.repo.factory = org.apache.oodt.cas.resource.noderepo.XmlNodeRepositoryFactory
+
+# queue repository factory
+org.apache.oodt.cas.resource.queues.repo.factory = org.apache.oodt.cas.resource.queuerepo.XmlQueueRepositoryFactory
# JobStack JobQueue config properties
org.apache.oodt.cas.resource.jobqueue.jobstack.maxstacksize=1000
# XML LRUScheduler config properties
-org.apache.oodt.cas.resource.scheduler.nodetoqueues.dirs=file://[HOME]/nodes,file://[HOME]/nodes2
org.apache.oodt.cas.resource.scheduler.wait.seconds=20
# XML-RPC configuration props
org.apache.oodt.cas.resource.system.xmlrpc.requestTimeout.minutes=20
org.apache.oodt.cas.resource.system.xmlrpc.connectionTimeout.minutes=60
+# XStream JobRepo configuration props
+org.apache.oodt.cas.resource.jobrepo.xstream.working.dir=[HOME]/job-repo
+org.apache.oodt.cas.resource.jobrepo.xstream.max.history=4000
+
+# XML Node Repository config properties
+org.apache.oodt.cas.resource.nodes.dirs=file://[HOME]/nodes,file://[HOME]/nodes2
+
+# XML Queue Repository config properties
+org.apache.oodt.cas.resource.nodetoqueues.dirs=file://[HOME]/nodes,file://[HOME]/nodes2
+
Added: oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/queuerepo/TestXmlQueueRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/queuerepo/TestXmlQueueRepository.java?rev=1045244&view=auto
==============================================================================
--- oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/queuerepo/TestXmlQueueRepository.java (added)
+++ oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/queuerepo/TestXmlQueueRepository.java Mon Dec 13 17:28:16 2010
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.queuerepo;
+
+//JDK imports
+import java.io.File;
+import java.util.Arrays;
+
+//OODT imports
+import org.apache.oodt.cas.resource.scheduler.QueueManager;
+import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException;
+
+//Junit imports
+import junit.framework.TestCase;
+
+/**
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * Test Suite for the {@link XmlQueueRepository} service
+ * </p>.
+ */
+public class TestXmlQueueRepository extends TestCase {
+
+ private QueueManager queueManager;
+
+ protected void setUp() {
+ System.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs",
+ "file:"
+ + new File("./src/main/resources/examples")
+ .getAbsolutePath());
+ this.queueManager = new XmlQueueRepositoryFactory().createQueueRepository().loadQueues();
+ }
+
+ public void testMapping() throws QueueManagerException {
+ assertEquals(this.queueManager.getQueues(), Arrays.asList("quick", "high", "long"));
+ assertEquals(this.queueManager.getNodes("quick"), Arrays.asList("localhost"));
+ assertEquals(this.queueManager.getNodes("high"), Arrays.asList("localhost"));
+ assertEquals(this.queueManager.getNodes("long"), Arrays.asList("localhost"));
+ assertEquals(this.queueManager.getQueues("localhost"), Arrays.asList("quick", "high", "long"));
+
+ this.queueManager.addQueue("test-queue-1");
+ this.queueManager.addNodeToQueue("test-node-1", "test-queue-1");
+
+ assertEquals(this.queueManager.getQueues("test-node-1"), Arrays.asList("test-queue-1"));
+
+ this.queueManager.addNodeToQueue("test-node-1","quick");
+ assertEquals(this.queueManager.getQueues("test-node-1"), Arrays.asList("quick", "test-queue-1"));
+
+ this.queueManager.removeQueue("quick");
+ assertEquals(this.queueManager.getQueues("test-node-1"), Arrays.asList("test-queue-1"));
+ assertEquals(this.queueManager.getQueues("localhost"), Arrays.asList("high", "long"));
+ }
+
+}
Propchange: oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/queuerepo/TestXmlQueueRepository.java
------------------------------------------------------------------------------
svn:mime-type = text/plain