You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/03/27 17:21:19 UTC
svn commit: r1461692 - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/
uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/
uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/
Author: challngr
Date: Wed Mar 27 16:21:18 2013
New Revision: 1461692
URL: http://svn.apache.org/r1461692
Log:
UIMA-2772
Rebuild state from OR publications on all restarts. Wait for init stability
even on rebuilt state if ducc.rm.fast.restart property is false.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java?rev=1461692&r1=1461691&r2=1461692&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java Wed Mar 27 16:21:18 2013
@@ -33,12 +33,14 @@ package org.apache.uima.ducc.common.util
* Many scripting updates. jrc
* 2013-01-02 0.7.2 AP service support, unified ping for CUSTOM and UIMA services
* 2013-02-03 0.7.3 First floor system from Apache distro.
+ * 2013-02-25 0.8.0 Second floor system from Apache distro. RM defrag, lots of SM updates,
+ * completed AP support in WS. CLI update, API creation.
*/
public class Version
{
private static final int major = 0; // Major version
- private static final int minor = 7; // Minor - may be API changes, or new function
- private static final int ptf = 3; // Fix level, fully compatible with previous, no relevent new function
+ private static final int minor = 8; // Minor - may be API changes, or new function
+ private static final int ptf = 0; // Fix level, fully compatible with previous, no relevent new function
private static final String id = "beta"; // A short decoration, optional
public final static String version()
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1461692&r1=1461691&r2=1461692&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java Wed Mar 27 16:21:18 2013
@@ -21,11 +21,13 @@ package org.apache.uima.ducc.rm;
import java.util.HashMap;
import java.util.Map;
+import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.utils.DuccCollectionUtils;
-import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference;
import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.rm.scheduler.IJobManager;
import org.apache.uima.ducc.rm.scheduler.IRmJob;
@@ -46,12 +48,13 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
-import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.rm.IResource;
import org.apache.uima.ducc.transport.event.rm.IRmJobState;
import org.apache.uima.ducc.transport.event.rm.Resource;
@@ -73,12 +76,16 @@ public class JobManagerConverter
JobManagerUpdate lastJobManagerUpdate = new JobManagerUpdate();
Map<IRmJob, IRmJob> refusedJobs = new HashMap<IRmJob, IRmJob>();
+
+ boolean recovery = false;
public JobManagerConverter(ISchedulerMain scheduler)
{
this.scheduler = scheduler;
this.localMap = new DuccWorkMap();
DuccLogger.setUnthreaded();
+
+ recovery = SystemPropertyResolver.getBooleanProperty("ducc.rm.fast.recovery", true);
}
int toInt(String s, int deflt)
@@ -587,6 +594,7 @@ public class JobManagerConverter
}
+ boolean first_or_state = true;
public void eventArrives(DuccWorkMap jobMap)
{
String methodName = "eventArrives";
@@ -598,9 +606,25 @@ public class JobManagerConverter
return;
}
+ // The init file is read and configured ?
+ if ( ! scheduler.isInitialized() ) return;
+
+ if ( first_or_state ) {
+ first_or_state = false;
+ if ( ! recoverFromOrchestrator(jobMap) ) {
+ logger.info(methodName, null, "There are no active jobs in map so can't build up state. Waiting for init stability.");
+ return;
+ }
+
+ if ( recovery ) {
+ logger.info(methodName, null, "Fast recovery is enabled: Recovered state from Orchestrator, starting scheduler.");
+ scheduler.start();
+ }
+ }
+
+ // scheduler is readied either by fast-recovery, or by init stability
if ( !scheduler.ready() ) {
- // JRC dumpOrchestratorState(jobMap);
- logger.info(methodName, null, "Orchestrator event is discarded because scheduler is not yet ready.");
+ logger.info(methodName, null, "Orchestrator event is discarded: waiting for init stability.");
return;
}
@@ -809,59 +833,108 @@ public class JobManagerConverter
}
- // JRC
-// void dumpOrchestratorState(DuccWorkMap jobmap)
-// {
-// String methodName = "dumpOrchetratorState";
-// for ( IDuccWork w : jobmap.values() ) {
-// String prefix = "?";
-// switch ( w.getDuccType() ) {
-// case Job:
-// prefix = "J";
-// break;
-// case Service:
-// prefix = "S";
-// break;
-// case Reservation:
-// prefix = "R";
-// break;
-// }
-
-// if ( w.isCompleted() ) {
-// logger.info(methodName, w.getDuccId(), "Ignoring completed work", prefix, w.getDuccId());
-// }
-
-// switch ( w.getDuccType() ) {
-// case Job: {
-// IDuccWorkExecutable de = (IDuccWorkExecutable) w;
-// IDuccProcessMap pm = de.getProcessMap();
-// logger.info(methodName, w.getDuccId(), "Received work of type", prefix, w.getDuccType(), "with", pm.size(), "processes.");
-// for ( IDuccProcess proc : pm.values() ) {
-// String pid = proc.getPID();
-// ProcessState state = proc.getProcessState();
-// logger.info(methodName, w.getDuccId(), "Found process", pid, "in state", state, "is complete:", proc.isComplete());
-// }
-// }
-// break;
-// case Service: {
-// IDuccWorkExecutable de = (IDuccWorkExecutable) w;
-// IDuccProcessMap pm = de.getProcessMap();
-// logger.info(methodName, w.getDuccId(), "Received work of type", prefix, w.getDuccType(), "with", pm.size(), "processes.");
-// }
-// break;
-// case Reservation: {
-// IDuccWorkReservation de = (IDuccWorkReservation) w;
-// IDuccReservationMap rm = de.getReservationMap();
-// logger.info(methodName, w.getDuccId(), "Received work of type", prefix, w.getDuccType(), "with", rm.size(), "nodes.");
-// }
-// break;
-// default:
-// logger.info(methodName, w.getDuccId(), "Received work of type ?", w.getDuccType());
-// break;
-// }
+ /**
+ * Got an OR map and we're ok for fast recovery. If the map has no "live" jobs we just ignore it - that's first-time
+ * startup and OR will not start if there is no JD node, so we do normal init stability. Otherwise, we assume that the
+ * JD node is included, build the resource map, and allow scheduling to proceed.
+ */
+ boolean recoverFromOrchestrator(DuccWorkMap jobmap)
+ {
+ String methodName = "recoverFromOrchestrator";
+ Map<Node, Node> nodes = new HashMap<Node, Node>();
+ for ( IDuccWork w : jobmap.values() ) {
+ String prefix = "?";
+ switch ( w.getDuccType() ) {
+ case Job:
+ prefix = "JOB";
+ break;
+ case Service:
+ prefix = "SVC";
+ break;
+ case Reservation:
+ prefix = "RES";
+ break;
+ }
-// logger.info(methodName, null, "Work:", w);
-// }
-// }
+ if ( w.isCompleted() ) {
+ logger.info(methodName, w.getDuccId(), "Ignoring completed work:", w.getDuccType(), ":", w.getStateObject());
+ continue;
+ }
+
+ switch ( w.getDuccType() ) {
+ case Job:
+ {
+ IDuccWorkExecutable de = (IDuccWorkExecutable) w;
+ IDuccProcessMap pm = de.getProcessMap();
+ logger.info(methodName, w.getDuccId(), "Receive:", prefix, w.getDuccType(), w.getStateObject(), "processes[", pm.size(), "] Completed:", w.isCompleted());
+
+ for ( IDuccProcess proc : pm.values() ) {
+ String pid = proc.getPID();
+ ProcessState state = proc.getProcessState();
+ Node n = proc.getNode();
+ if ( n == null ) {
+ logger.info(methodName, w.getDuccId(), " Process[", pid, "] state [", state, "] is complete[", proc.isComplete(), "] Node [N/A] mem[N/A");
+ } else {
+ long mem = n .getNodeMetrics().getNodeMemory().getMemTotal();
+ logger.info(methodName, w.getDuccId(),
+ " Process[", pid,
+ "] state [", state,
+ "] is complete [", proc.isComplete(),
+ "] Node [", n.getNodeIdentity().getName() + "." + proc.getDuccId(),
+ "] mem [", mem, "]");
+ logger.info(methodName, w.getDuccId(), " Recover node[", n.getNodeIdentity().getName());
+ //
+ // Note, not ignoring dead processes belonging to live jobs. Is this best or should we be
+ // more conservative and not use nodes that we don't know 100% for sure are ok?
+ //
+ nodes.put(n, n);
+ }
+ }
+ }
+ break;
+
+ case Service:
+ {
+ IDuccWorkExecutable de = (IDuccWorkExecutable) w;
+ IDuccProcessMap pm = de.getProcessMap();
+ logger.info(methodName, w.getDuccId(), prefix, w.getDuccType(), "processes[", pm.size(), "].");
+ }
+ break;
+
+ case Reservation:
+ {
+ IDuccWorkReservation de = (IDuccWorkReservation) w;
+ IDuccReservationMap rm = de.getReservationMap();
+
+ logger.info(methodName, w.getDuccId(), "Receive:", prefix, w.getDuccType(), w.getStateObject(), "processes[", rm.size(), "] Completed:", w.isCompleted());
+
+ for ( IDuccReservation r: rm.values()) {
+ Node n = r.getNode();
+ if ( n == null ) {
+ logger.info(methodName, w.getDuccId(),
+ " Node [N/A] mem[N/A");
+ } else {
+ long mem = n .getNodeMetrics().getNodeMemory().getMemTotal();
+ logger.info(methodName, w.getDuccId(),
+ " Node[", n.getNodeIdentity().getName(),
+ "] mem[", mem, "]");
+ nodes.put(n, n);
+ }
+ }
+ }
+ break;
+
+ default:
+ logger.info(methodName, w.getDuccId(), "Received work of type ?", w.getDuccType());
+ break;
+ }
+ }
+ logger.info(methodName, null, "Recovered[", nodes.size(), "] nodes from OR state.");
+ for (Node n : nodes.values() ) {
+ scheduler.nodeArrives(n);
+ }
+
+ return (nodes.size() != 0);
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1461692&r1=1461691&r2=1461692&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java Wed Mar 27 16:21:18 2013
@@ -61,6 +61,12 @@ public interface ISchedulerMain
void queryMachines();
- boolean ready(); // we don't accept any state until scheduler is ready
+ // two flags are needed to cope with the asynchronous messages that can arrive at any time:
+ // has the scheduler read it's config files and initialized structures?
+ // has the scheduler discovered enough resources that it can schedule work?
+ boolean isInitialized(); // has scheduler read all it's config and set up its strucures?
+ boolean ready(); // have enough resources checked in so scheduler can schedule work?
+
+ // once both initialized() and ready() occur, the RM scaffolding will enable scheduling by calling start
void start();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1461692&r1=1461691&r2=1461692&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Wed Mar 27 16:21:18 2013
@@ -39,8 +39,6 @@ import org.apache.uima.ducc.common.utils
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
-import com.google.gson.Gson;
-
/**
* This process orchestrates scheduling.
@@ -133,7 +131,7 @@ public class Scheduler
{
}
- public void init()
+ public synchronized void init()
throws Exception
{
String methodName = "init";
@@ -227,6 +225,11 @@ public class Scheduler
initialized = true;
}
+ public synchronized boolean isInitialized()
+ {
+ return initialized;
+ }
+
public Machine getMachine(NodeIdentity ni)
{
return nodepool.getMachine(ni);
@@ -638,12 +641,12 @@ public class Scheduler
* We don't accept new work or even Orchestrator state updates until "ready". We do
* want machines, but be sure the internal structures are protected.
*/
- public boolean ready()
+ public synchronized boolean ready()
{
return stability;
}
- public void start()
+ public synchronized void start()
{
stability = true;
}
@@ -652,7 +655,7 @@ public class Scheduler
{
String methodName = "handleDeadNodes";
- if ( ! initialized ) {
+ if ( ! isInitialized() ) {
return;
}
@@ -696,7 +699,11 @@ public class Scheduler
// return null;
// }
- if ( ! stability ) {
+ if ( ! ready() ) {
+ return null;
+ }
+
+ if ( ! isInitialized() ) {
return null;
}
@@ -890,18 +897,9 @@ public class Scheduler
private int total_arrivals = 0;
public void nodeArrives(Node node)
{
- String methodName = "nodeArrives";
+ // String methodName = "nodeArrives";
// The first block insures the node is in the scheduler's records as soon as possible
- try {
- Gson gson = new Gson();
- String gnode = gson.toJson(node);
- logger.info(methodName, null, "GSON:", gnode);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- logger.error(methodName, null, e);
- }
-
total_arrivals++; // report these in the main schedule loop
synchronized(this) {
// the amount of memory available for shares, adjusted with configured overhead