You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2016/01/20 18:42:22 UTC
svn commit: r1725763 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler:
Machine.java NodePool.java Scheduler.java Share.java
Author: challngr
Date: Wed Jan 20 17:42:22 2016
New Revision: 1725763
URL: http://svn.apache.org/viewvc?rev=1725763&view=rev
Log:
UIMA-4742 Clean-up and adjust rules for purging a node for vary-on and for node death, first delivery.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java?rev=1725763&r1=1725762&r2=1725763&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java Wed Jan 20 17:42:22 2016
@@ -444,19 +444,19 @@ public class Machine
return ret;
}
- RmQueriedMachine queryOfflineMachine() // UIMA-4234
- {
- RmQueriedMachine ret = queryMachine();
- ret.setOffline();
- return ret;
- }
-
- RmQueriedMachine queryUnresponsiveMachine() // UIMA-4234
- {
- RmQueriedMachine ret = queryMachine();
- ret.setUnresponsive();
- return ret;
- }
+// RmQueriedMachine queryOfflineMachine() // UIMA-4234
+// {
+// RmQueriedMachine ret = queryMachine();
+// ret.setOffline();
+// return ret;
+// }
+//
+// RmQueriedMachine queryUnresponsiveMachine() // UIMA-4234
+// {
+// RmQueriedMachine ret = queryMachine();
+// ret.setUnresponsive();
+// return ret;
+// }
/**
* A machine's investment is the sum of it's share's investments.
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java?rev=1725763&r1=1725762&r2=1725763&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java Wed Jan 20 17:42:22 2016
@@ -542,13 +542,6 @@ class NodePool
Machine m = allMachines.get(n);
if ( m == null ) {
- m = unresponsiveMachines.get(n);
- }
- if ( m == null ) {
- m = offlineMachines.get(n);
- }
-
- if ( m == null ) {
for ( NodePool np : children.values() ) {
m = np.getMachine(n);
if ( m != null ) {
@@ -972,36 +965,36 @@ class NodePool
return np;
}
- private synchronized void incrementOnlineByOrder(int order)
- {
- if ( ! onlineMachinesByOrder.containsKey(order) ) {
- onlineMachinesByOrder.put(order, 1);
- } else {
- onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) + 1);
- }
- }
-
- private synchronized void decrementOnlineByOrder(int order)
- {
- onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) - 1);
- }
-
- synchronized void getLocalOnlineByOrder(int[] ret) // for queries, just me
- {
- for ( int o: onlineMachinesByOrder.keySet() ) {
- ret[o] += onlineMachinesByOrder.get(o);
- }
- }
-
- synchronized void getOnlineByOrder(int[] ret) // for queries
- {
- for ( int o: onlineMachinesByOrder.keySet() ) {
- ret[o] += onlineMachinesByOrder.get(o);
- }
- for ( NodePool child : children.values() ) {
- child.getOnlineByOrder(ret);
- }
- }
+// private synchronized void incrementOnlineByOrder(int order)
+// {
+// if ( ! onlineMachinesByOrder.containsKey(order) ) {
+// onlineMachinesByOrder.put(order, 1);
+// } else {
+// onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) + 1);
+// }
+// }
+
+// private synchronized void decrementOnlineByOrder(int order)
+// {
+// onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) - 1);
+// }
+
+// synchronized void getLocalOnlineByOrder(int[] ret) // for queries, just me
+// {
+// for ( int o: onlineMachinesByOrder.keySet() ) {
+// ret[o] += onlineMachinesByOrder.get(o);
+// }
+// }
+
+// synchronized void getOnlineByOrder(int[] ret) // for queries
+// {
+// for ( int o: onlineMachinesByOrder.keySet() ) {
+// ret[o] += onlineMachinesByOrder.get(o);
+// }
+// for ( NodePool child : children.values() ) {
+// child.getOnlineByOrder(ret);
+// }
+// }
void signalDb(Machine m, RmNodes key, Object value)
@@ -1046,65 +1039,81 @@ class NodePool
return props;
}
+ void adjustMachinesByOrder(int neworder, Machine m)
+ {
+ int oldorder = m.getShareOrder();
+ if ( oldorder != neworder ) { // can change. e.g. if it was taken offline for
+ HashMap<Node, Machine> mlist = machinesByOrder.get(oldorder);
+ mlist.remove(m.key());
+ m.setShareOrder(neworder); // hardware changes.
+
+ mlist = machinesByOrder.get(neworder);
+ if ( mlist == null ) {
+ mlist = new HashMap<Node, Machine>();
+ machinesByOrder.put(neworder, mlist);
+ }
+ mlist.put(m.key(), m);
+ }
+ }
+
/**
* Handle a new node update.
*/
Machine nodeArrives(Node node, int order)
{
String methodName = "nodeArrives";
+ // Note: the caller of this method MUST (aka IS REQUIRED) to insure this this is the
+ // right nodepool as we do not recurse.
updateMaxOrder(order);
-
- for ( NodePool np : children.values() ) {
- if ( np.containsPoolNode(node) ) {
- Machine m = np.nodeArrives(node, order);
- return m;
- }
- }
- if ( allMachines.containsKey(node) ) { // already known, do nothing
- Machine m = allMachines.get(node);
- logger.trace(methodName, null, "Node ", m.getId(), " is already known, not adding.");
- return m;
- }
+ String n = node.getNodeIdentity().getName();
- if ( offlineMachines.containsKey(node) ) { // if it's offline it can't be restored like this.
+ // if it's offline it can't be restored like this.
+ if ( offlineMachines.containsKey(node) ) {
Machine m = offlineMachines.get(node);
logger.trace(methodName, null, "Node ", m.getId(), " is offline, not activating.");
return m;
}
+ // logger.info(methodName, null, "NODEARRIVES", n, "pass offline Machines");
+ // if it was dead, then it isn't any more, AND it's mine, so I need to restart it
if ( unresponsiveMachines.containsKey(node) ) { // reactive the node
- Machine m = unresponsiveMachines.remove(node);
- if ( m.getShareOrder() != order ) { // can change. e.g. if it was taken offline for
- m.setShareOrder(order); // hardware changes.
- }
+ logger.info(methodName, null, "RECOVER NODE", n);
+ Machine m = unresponsiveMachines.remove(node); // not unresponsive any more
+
+ // Deal with memory on the machine changing
+ adjustMachinesByOrder(order, m);
+
+ // Note: The machine must be on all the other lists by definition since it wasn't taken off when it went offline
- allMachines.put(node, m);
- machinesByName.put(m.getId(), m);
- machinesByIp.put(m.getIp(), m);
- HashMap<Node, Machine> mlist = machinesByOrder.get(order);
- incrementOnlineByOrder(order);
- if ( mlist == null ) {
- mlist = new HashMap<Node, Machine>();
- machinesByOrder.put(order, mlist);
- }
- mlist.put(m.key(), m);
-
- total_shares += order; // UIMA-3939
signalDb(m, RmNodes.Responsive, true);
- logger.info(methodName, null, "Nodepool:", id, "Host reactivated ", m.getId(), String.format("shares %2d total %4d:", order, total_shares), m.toString());
+ logger.info(methodName, null, "Nodepool:", id, "Host reactivated ", m.getId(), "shares", order, m.toString());
return m;
}
+ // logger.info(methodName, null, "NODEARRIVES", n, "pass unresponsive Machines");
+ // ok, it is my problem? If so, then it isn't offline or dead, so it's ok, and we're done
+ if ( allMachines.containsKey(node) ) { // already known, do nothing
+ Machine m = allMachines.get(node);
+
+ // Deal with memory on the machine changing
+ adjustMachinesByOrder(order, m);
+
+ logger.trace(methodName, null, "Node ", m.getId(), " is already known, not adding.");
+ return m;
+ }
+ // logger.info(methodName, null, "NODEARRIVES", n, "pass allMachines");
+
+ // If we fall through it's a new one.
Machine machine = new Machine(node); // brand new machine, make it active
Node key = machine.key();
machine.setShareOrder(order);
allMachines.put(key, machine); // global list
machinesByName.put(machine.getId(), machine);
machinesByIp.put(machine.getIp(), machine);
- incrementOnlineByOrder(order);
+ //incrementOnlineByOrder(order);
machine.setNodepool(this);
total_shares += order;
@@ -1125,71 +1134,48 @@ class NodePool
props.put(RmNodes.Responsive, true);
props.put(RmNodes.Online, true);
try {
- persistence.createMachine(machine.getId(), props);
- } catch (Exception e) {
- logger.warn(methodName, null, "Cannot write machine to DB:", machine.getId(), e);
- }
+ persistence.createMachine(machine.getId(), props);
+ } catch (Exception e) {
+ logger.warn(methodName, null, "Cannot write machine to DB:", machine.getId(), e);
+ }
return machine;
}
- void disable(Machine m, HashMap<Node, Machine> disableMap)
+ void disable(Machine m)
{
String methodName = "disable";
- if ( allMachines.containsKey(m.key()) ) {
- logger.info(methodName, null, "Nodepool:", id, "Host disabled:", m.getId(), "Looking for shares to clear");
-
- int order = m.getShareOrder();
- String name = m.getId();
- String ip = m .getIp();
-
- HashMap<Share, Share> shares = m.getActiveShares();
- for (Share s : shares.values()) {
- IRmJob j = s.getJob();
-
- if ( j.getDuccType() == DuccType.Reservation ) {
- // UIMA-3614. Only actual reservation is left intact
- logger.info(methodName, null, "Nodepool:", id, "Host dead/offline:", m.getId(), "Not purging", j.getDuccType());
- break;
- }
-
- switch ( j.getDuccType() ) {
- case Reservation:
- // UIMA-3614. Only actual reservation is left intact
- logger.info(methodName, null, "Nodepool:", id, "Host dead/offline:", m.getId(), "Not purging", j.getDuccType());
- break;
+ logger.info(methodName, null, "Nodepool:", id, "Host disabled:", m.getId(), "Looking for shares to clear");
- case Service:
- case Pop:
- j.markComplete(); // UIMA-4327 Must avoid reallocation, these guys are toast if they get purged.
- logger.info(methodName, null, "Nodepool:", id, "Host dead/offline:", m.getId(), "Mark service/pop completed.");
- // NO BREAK, must fall through
- case Job:
- default:
- break;
- }
+ int order = m.getShareOrder();
+ String name = m.getId();
+ String ip = m .getIp();
+
+ HashMap<Share, Share> shares = m.getActiveShares();
+ for (Share s : shares.values()) {
+ IRmJob j = s.getJob();
+ if ( j.getSchedulingPolicy() != Policy.FAIR_SHARE ) {
+ logger.info(methodName, j.getId(), "Nodepool:", id, "Host dead/offline:", m.getId(), "Not purging NP work", j.getDuccType());
+ } else {
logger.info(methodName, j.getId(), "Nodepool:", id, "Purge", j.getDuccType(), "on dead/offline:", m.getId());
j.shrinkByOne(s);
nPendingByOrder[order]++;
-
- s.purge(); // This bet tells OR not to wait for confirmation from the agent
+
+ s.purge(); // This bit tells OR not to wait for confirmation from the agent
}
+ }
+ }
- allMachines.remove(m.key());
- decrementOnlineByOrder(order);
- total_shares -= order;
- disableMap.put(m.key(), m);
-
- HashMap<Node, Machine> machs = machinesByOrder.get(order);
- machs.remove(m.key());
- if ( machs.size() == 0 ) {
- machinesByOrder.remove(order);
- }
- machinesByName.remove(name);
- machinesByIp.remove(ip);
- logger.info(methodName, null, "Nodepool:", id, "Node leaves:", m.getId(), "total shares:", total_shares);
+ void nodeLeaves(Machine m)
+ {
+ // note, simpler than varyoff because we really don't care about unusual
+ // conditions since there's nobody to tell
+ if ( allMachines.containsKey(m.key()) ) {
+ disable(m);
+ unresponsiveMachines.put(m.key(), m);
+ signalDb(m, RmNodes.Responsive, false);
} else {
for ( NodePool np : children.values() ) {
np.nodeLeaves(m);
@@ -1197,61 +1183,78 @@ class NodePool
}
}
- void nodeLeaves(Machine m)
- {
- disable(m, unresponsiveMachines);
- signalDb(m, RmNodes.Responsive, false);
- }
-
// UIMA-4142
// helper for CLI things that refer to things by name only. do we know about anything by this
// name? see resolve() in Scheduler.java.
boolean hasNode(String n)
{
- if ( machinesByName.containsKey(n) ) return true;
+ return machinesByName.containsKey(n);
+ }
- // If not we have to search the offline machines and the unresponsive machines which are
- // keyed differently. This is really ugly but hard to fix at this point, so cope.
- for ( Node node : offlineMachines.keySet() ) {
- if ( node.getNodeIdentity().getName().equals(n) ) return true;
- }
- for ( Node node : unresponsiveMachines.keySet() ) {
- if ( node.getNodeIdentity().getName().equals(n) ) return true;
+ NodePool findNodepoolByNodename(String n)
+ {
+ if ( hasNode(n) ) {
+ return this;
+ } else {
+ for ( NodePool np : children.values() ) {
+ NodePool ret = np.findNodepoolByNodename(n);
+ if ( ret != null ) {
+ return ret;
+ }
+ }
}
- return false;
+ return null;
}
- String varyoff(String node)
+ private String doVaryOff(String node)
{
+ // caller must insure node is known to "me"
Machine m = machinesByName.get(node);
- if ( m == null ) {
- // ok, maybe it's already offline or maybe dead
- // relatively rare, cleaner to search than to make yet another index
+ if (offlineMachines.containsKey(m.key()) ) {
+ return "VaryOff: Nodepool " + id + " - Already offline: " + node;
+ }
- for ( Machine mm : offlineMachines.values() ) {
- if ( mm.getId().equals(node) ) {
- return "VaryOff: Nodepool " + id + " - Already offline: " + node;
- }
- }
+ if ( unresponsiveMachines.containsKey(m.key()) ) {
+ // lets be friendly and tell caller it's also unresponsive
+ offlineMachines.put(m.key(), m);
+ signalDb(m, RmNodes.Online, false);
+ return "VaryOff: Nodepool " + id + " - Unresponsive machine, marked offline: " + node;
+ }
- Iterator<Machine> iter = unresponsiveMachines.values().iterator();
- while ( iter.hasNext() ) {
- Machine mm = iter.next();
- if ( mm.getId().equals(node) ) {
- Node key = mm.key();
- iter.remove();
- offlineMachines.put(key, mm);
- signalDb(m, RmNodes.Online, false);
- return "VaryOff: Nodepool " + id + " - Unresponsive machine, marked offline: " + node;
- }
- }
+ offlineMachines.put(m.key(), m);
+ disable(m);
+ signalDb(m, RmNodes.Online, false);
+ return "VaryOff: " + node + " - OK.";
+ }
+ String varyoff(String node)
+ {
+ // note, vaguely trickier than 'nodeLeaves' because we need to catch the
+ // potential user confusions and reflect them back.
+ NodePool np = findNodepoolByNodename(node);
+ if ( np == null ) {
return "VaryOff: Nodepool " + id + " - Cannot find machine: " + node;
}
- disable(m, offlineMachines);
- signalDb(m, RmNodes.Online, false);
- return "VaryOff: " + node + " - OK.";
+ // note we only call this if we know for sure the node can be found and associated with a NP
+ return np.doVaryOff(node); // must direct to the correct context
+ }
+
+ private String doVaryOn(String node)
+ {
+
+ // caller must insure node is known to "me"
+ Machine m = machinesByName.get(node);
+ Node key = m.key();
+
+ if ( ! offlineMachines.containsKey(key) ) {
+ return "VaryOn: Nodepool " + id + " - Already online: " + m.getId();
+ }
+
+ offlineMachines.remove(key);
+ signalDb(m, RmNodes.Online, true);
+
+ return "VaryOn: Nodepool " + id + " - Machine marked online: " + node;
}
/**
@@ -1260,30 +1263,21 @@ class NodePool
*/
String varyon(String node)
{
- if ( machinesByName.containsKey(node) ) {
- return "VaryOn: Nodepool " + id + " - Already online: " + node;
+ NodePool np = findNodepoolByNodename(node);
+ if ( np == null ) {
+ return "VaryOff: Nodepool " + id + " - Cannot find machine: " + node;
}
- Iterator<Machine> iter = offlineMachines.values().iterator();
- while ( iter.hasNext() ) {
- Machine mm = iter.next();
- if ( mm.getId().equals(node) ) {
- iter.remove();
- signalDb(mm, RmNodes.Online, true);
- return "VaryOn: Nodepool " + id + " - Machine marked online: " + node;
- }
- }
+ return np.doVaryOn(node); // must pass to the right nodepool, can't do it "here"
+ }
- iter = unresponsiveMachines.values().iterator();
- while ( iter.hasNext() ) {
- Machine mm = iter.next();
- if ( mm.getId().equals(node) ) {
- signalDb(mm, RmNodes.Online, true);
- return "VaryOn: Nodepool " + id + " - Machine is online but not responsive: " + node;
- }
- }
-
- return "VaryOn: Nodepool " + id + " - Cannot find machine: " + node;
+ boolean isSchedulable(Machine m)
+ {
+ if ( m.isBlacklisted() ) return false;
+ if ( unresponsiveMachines.containsKey(m.key()) ) return false;
+ if ( offlineMachines.containsKey(m.key()) ) return false;
+
+ return true;
}
/**
@@ -1773,7 +1767,7 @@ class NodePool
ml.addAll(machs.values());
for ( Machine m : ml ) { // look for space
- if ( m.isBlacklisted() ) continue; // nope
+ if ( !isSchedulable(m) ) continue; // nope
if ( (!allowVertical) && (m.hasVerticalConflict(j)) ) continue; // UIMA-4712
int g = Math.min(needed, m.countFreeShares(order)); // adjust by the order supported on the machine
for ( int ndx= 0; ndx < g; ndx++ ) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1725763&r1=1725762&r2=1725763&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Wed Jan 20 17:42:22 2016
@@ -902,9 +902,11 @@ public class Scheduler
// tracking the OR hang problem - are topics being delivered?
logger.info("nodeArrives", null, "Total arrivals:", total_arrivals);
- handleIllNodes();
- handleDeadNodes();
- resetNodepools();
+ synchronized(this) {
+ handleIllNodes();
+ handleDeadNodes();
+ resetNodepools();
+ }
// TODO: Can we combine these two into one?
SchedulingUpdate upd = new SchedulingUpdate(); // state from internal scheduler
@@ -1166,23 +1168,20 @@ public class Scheduler
// The first block insures the node is in the scheduler's records as soon as possible
total_arrivals++; // report these in the main schedule loop
- // the amount of memory available for shares, adjusted with configured overhead
- NodePool np = getNodepoolByName(node.getNodeIdentity());
+ NodePool np = getNodepoolByName(node.getNodeIdentity()); // finds np assigned in ducc.nodes; if none, returns the default np
Machine m = np.getMachine(node);
int share_order = 0;
-
- if ( m == null ) {
- // allNodes.put(node, node);
- long allocatable_mem = node.getNodeMetrics().getNodeMemory().getMemFree() - share_free_dram;
- if ( dramOverride > 0 ) {
- allocatable_mem = dramOverride;
- }
- share_order = (int) (allocatable_mem / np.getShareQuantum()); // conservative - rounds down (this will always cast ok)
- } else {
- share_order = m.getShareOrder();
- }
-
+
+ // let's always recalculate this in case it changes for whatever bizarre reason (reboot, or pinned process gone, or whatever)
+ long allocatable_mem = node.getNodeMetrics().getNodeMemory().getMemFree() - share_free_dram;
+ if ( dramOverride > 0 ) {
+ allocatable_mem = dramOverride;
+ }
+ share_order = (int) (allocatable_mem / np.getShareQuantum()); // conservative - rounds down (this will always cast ok)
+ // NOTE: we cannot set the order into the machine yet, in case it has changed, because NodePool needs to adjust based
+ // on current and new
+
max_order = Math.max(share_order, max_order);
m = np.nodeArrives(node, share_order); // announce to the nodepools
m.heartbeatArrives();
@@ -1307,7 +1306,7 @@ public class Scheduler
freeMachines[i] += np.countFreeMachines(i); // (these are local, as we want)
}
- np.getLocalOnlineByOrder(onlineMachines);
+ //np.getLocalOnlineByOrder(onlineMachines);
ret.setOnlineMachines(onlineMachines);
ret.setFreeMachines(freeMachines);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java?rev=1725763&r1=1725762&r2=1725763&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Wed Jan 20 17:42:22 2016
@@ -294,7 +294,7 @@ public class Share
} catch (Exception e) {
logger.warn(methodName, job.getId(), "Cannot update share statistics in database for share", id, e);
}
- logger.info(methodName, jobid, "UPDATE:", investment, state, getInitializationTime(), pid);
+
return true;
}