You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/03/27 17:21:19 UTC

svn commit: r1461692 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/ uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/

Author: challngr
Date: Wed Mar 27 16:21:18 2013
New Revision: 1461692

URL: http://svn.apache.org/r1461692
Log:
UIMA-2772
Rebuild state from OR publications on all restarts.  Wait for init stability
even on rebuilt state if ducc.rm.fast.restart property is false.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java?rev=1461692&r1=1461691&r2=1461692&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/Version.java Wed Mar 27 16:21:18 2013
@@ -33,12 +33,14 @@ package org.apache.uima.ducc.common.util
  *                    Many scripting updates. jrc
  * 2013-01-02  0.7.2  AP service support, unified ping for CUSTOM and UIMA services
  * 2013-02-03  0.7.3  First floor system from Apache distro.
+ * 2013-02-25  0.8.0  Second floor system from Apache distro. RM defrag, lots of SM updates,
+ *                      completed AP support in WS. CLI update, API creation.
  */
 public class Version
 {
     private static final int major = 0;       // Major version
-    private static final int minor = 7;       // Minor - may be API changes, or new function
-    private static final int ptf   = 3;       // Fix level, fully compatible with previous, no relevent new function
+    private static final int minor = 8;       // Minor - may be API changes, or new function
+    private static final int ptf   = 0;       // Fix level, fully compatible with previous, no relevent new function
     private static final String id = "beta";  // A short decoration, optional
 
     public final static String version()

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1461692&r1=1461691&r2=1461692&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java Wed Mar 27 16:21:18 2013
@@ -21,11 +21,13 @@ package org.apache.uima.ducc.rm;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.uima.ducc.common.Node;
 import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.utils.DuccCollectionUtils;
-import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference;
 import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.rm.scheduler.IJobManager;
 import org.apache.uima.ducc.rm.scheduler.IRmJob;
@@ -46,12 +48,13 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
 import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
 import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
 import org.apache.uima.ducc.transport.event.common.IDuccWork;
 import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
 import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
 import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
-import org.apache.uima.ducc.transport.event.common.ITimeWindow;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
 import org.apache.uima.ducc.transport.event.rm.IResource;
 import org.apache.uima.ducc.transport.event.rm.IRmJobState;
 import org.apache.uima.ducc.transport.event.rm.Resource;
@@ -73,12 +76,16 @@ public class JobManagerConverter
     JobManagerUpdate lastJobManagerUpdate = new JobManagerUpdate();
 
     Map<IRmJob, IRmJob> refusedJobs = new HashMap<IRmJob, IRmJob>();
+    
+    boolean recovery = false;
 
     public JobManagerConverter(ISchedulerMain scheduler)
     {
         this.scheduler = scheduler;
         this.localMap = new DuccWorkMap();
         DuccLogger.setUnthreaded();
+
+        recovery = SystemPropertyResolver.getBooleanProperty("ducc.rm.fast.recovery", true);
     }
 
     int toInt(String s, int deflt)
@@ -587,6 +594,7 @@ public class JobManagerConverter
 
     }
 
+    boolean first_or_state = true;
     public void eventArrives(DuccWorkMap jobMap)
     {
     	String methodName = "eventArrives";
@@ -598,9 +606,25 @@ public class JobManagerConverter
             return;
         }
 
+        // The init file is read and configured ?
+        if ( ! scheduler.isInitialized() ) return;
+        
+        if ( first_or_state ) {
+            first_or_state = false;
+            if ( ! recoverFromOrchestrator(jobMap) ) {
+                logger.info(methodName, null, "There are no active jobs in map so can't build up state. Waiting for init stability.");
+                return;
+            } 
+            
+            if ( recovery ) {
+                logger.info(methodName, null, "Fast recovery is enabled: Recovered state from Orchestrator, starting scheduler.");
+                scheduler.start();
+            }
+        }
+
+        // scheduler is readied either by fast-recovery, or by init stability
         if ( !scheduler.ready() ) {
-            // JRC dumpOrchestratorState(jobMap);
-            logger.info(methodName, null, "Orchestrator event is discarded because scheduler is not yet ready.");
+            logger.info(methodName, null, "Orchestrator event is discarded: waiting for init stability.");
             return;
         }
 
@@ -809,59 +833,108 @@ public class JobManagerConverter
 
     }
 
-    // JRC    
-//     void dumpOrchestratorState(DuccWorkMap jobmap)
-//     {
-//     	String methodName = "dumpOrchetratorState";
-//         for ( IDuccWork w : jobmap.values() ) {
-//         	String prefix = "?";
-//             switch ( w.getDuccType() ) {
-//                 case Job:
-//                     prefix = "J";
-//                     break;
-//                 case Service:
-//                     prefix = "S";
-//                     break;
-//                 case Reservation:
-//                     prefix = "R";
-//                     break;
-//             }
-
-//             if ( w.isCompleted() ) {
-//                 logger.info(methodName, w.getDuccId(), "Ignoring completed work", prefix, w.getDuccId());
-//             }
-
-//             switch ( w.getDuccType() ) {
-//                 case Job: {
-//                     IDuccWorkExecutable de = (IDuccWorkExecutable) w;
-//                     IDuccProcessMap pm = de.getProcessMap();
-//                     logger.info(methodName, w.getDuccId(), "Received work of type", prefix, w.getDuccType(), "with", pm.size(), "processes.");
-//                     for ( IDuccProcess proc : pm.values() ) {
-//                         String pid = proc.getPID();
-//                         ProcessState state = proc.getProcessState();
-//                         logger.info(methodName, w.getDuccId(), "Found process", pid, "in state", state, "is complete:", proc.isComplete());
-//                     }
-//                     }
-//                     break;
-//                 case Service: {
-//                     IDuccWorkExecutable de = (IDuccWorkExecutable) w;
-//                     IDuccProcessMap pm = de.getProcessMap();
-//                     logger.info(methodName, w.getDuccId(), "Received work of type", prefix, w.getDuccType(), "with", pm.size(), "processes.");
-//                     }
-//                     break;
-//                 case Reservation: {
-//                     IDuccWorkReservation de = (IDuccWorkReservation) w;
-//                     IDuccReservationMap  rm = de.getReservationMap();
-//                     logger.info(methodName, w.getDuccId(), "Received work of type", prefix, w.getDuccType(), "with", rm.size(), "nodes.");
-//                     }
-//                     break;
-//                 default:
-//                     logger.info(methodName, w.getDuccId(), "Received work of type ?", w.getDuccType());
-//                     break;
-//             }
+    /**
+     * Got an OR map and we're ok for fast recovery.  If the map has no "live" jobs we just ignore it - that's first-time
+     * startup and OR will not start if there is no JD node, so we do normal init stability.  Otherwise, we assume that the
+     * JD node is included, build the resource map, and allow scheduling to proceed.
+     */
+    boolean recoverFromOrchestrator(DuccWorkMap jobmap)
+    {
+    	String methodName = "recoverFromOrchestrator";
+        Map<Node, Node> nodes = new HashMap<Node, Node>();
+        for ( IDuccWork w : jobmap.values() ) {
+        	String prefix = "?";
+            switch ( w.getDuccType() ) {
+            case Job:
+                prefix = "JOB";
+                break;
+            case Service:
+                prefix = "SVC";
+                break;
+            case Reservation:
+                prefix = "RES";
+                break;
+            }
 
-//             logger.info(methodName, null, "Work:", w);
-//         }
-//     }
+            if ( w.isCompleted() ) {
+                logger.info(methodName, w.getDuccId(), "Ignoring completed work:", w.getDuccType(), ":", w.getStateObject());
+                continue;
+            }
+                        		
+            switch ( w.getDuccType() ) {
+                case Job:
+                    {
+                        IDuccWorkExecutable de = (IDuccWorkExecutable) w;
+                        IDuccProcessMap pm = de.getProcessMap();
+                        logger.info(methodName, w.getDuccId(), "Receive:", prefix, w.getDuccType(), w.getStateObject(), "processes[", pm.size(), "] Completed:", w.isCompleted());
+
+                        for ( IDuccProcess proc : pm.values() ) {
+                            String pid = proc.getPID();
+                            ProcessState state = proc.getProcessState();
+                            Node n = proc.getNode();
+                            if ( n == null ) {
+                                logger.info(methodName, w.getDuccId(), "   Process[", pid, "] state [", state, "] is complete[", proc.isComplete(), "] Node [N/A] mem[N/A");
+                            } else {
+                                long mem = n .getNodeMetrics().getNodeMemory().getMemTotal();
+                                logger.info(methodName, w.getDuccId(), 
+                                            "   Process[", pid, 
+                                            "] state [", state, 
+                                            "] is complete [", proc.isComplete(),
+                                            "] Node [", n.getNodeIdentity().getName() + "." + proc.getDuccId(),                                            
+                                            "] mem [", mem, "]");                    
+                                logger.info(methodName, w.getDuccId(), "      Recover node[", n.getNodeIdentity().getName());
+                                // 
+                                // Note, not ignoring dead processes belonging to live jobs.  Is this best or should we be
+                                // more conservative and not use nodes that we don't know 100% for sure are ok?
+                                //
+                                nodes.put(n, n);
+                            }
+                        }
+                    }
+                    break;
+
+                case Service: 
+                    {
+                        IDuccWorkExecutable de = (IDuccWorkExecutable) w;
+                        IDuccProcessMap pm = de.getProcessMap();
+                        logger.info(methodName, w.getDuccId(), prefix, w.getDuccType(), "processes[", pm.size(), "].");
+                    }
+                    break;
+                    
+                case Reservation: 
+                    {
+                        IDuccWorkReservation de = (IDuccWorkReservation) w;
+                        IDuccReservationMap  rm = de.getReservationMap();
+
+                        logger.info(methodName, w.getDuccId(), "Receive:", prefix, w.getDuccType(), w.getStateObject(), "processes[", rm.size(), "] Completed:", w.isCompleted());
+                        
+                        for ( IDuccReservation r: rm.values()) {
+                            Node n = r.getNode();                        
+                            if ( n == null ) {
+                                logger.info(methodName, w.getDuccId(), 
+                                            "    Node [N/A] mem[N/A");
+                            } else {
+                                long mem = n .getNodeMetrics().getNodeMemory().getMemTotal();
+                                logger.info(methodName, w.getDuccId(), 
+                                            "   Node[", n.getNodeIdentity().getName(),
+                                            "] mem[", mem, "]");
+                                nodes.put(n, n);
+                            }
+                        }
+                    }
+                    break;
+
+                default:
+                    logger.info(methodName, w.getDuccId(), "Received work of type ?", w.getDuccType());
+                    break;
+            }
+        }
+        logger.info(methodName, null, "Recovered[", nodes.size(), "] nodes from OR state.");
+        for (Node n : nodes.values() ) {
+            scheduler.nodeArrives(n);
+        }
+        
+        return (nodes.size() != 0);
+    }
 }
 

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1461692&r1=1461691&r2=1461692&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java Wed Mar 27 16:21:18 2013
@@ -61,6 +61,12 @@ public interface ISchedulerMain
 
     void queryMachines();
 
-    boolean ready();                    // we don't accept any state until scheduler is ready
+    // two flags are needed to cope with the asynchronous messages that can arrive at any time:
+    //    has the scheduler read it's config files and initialized structures?
+    //    has the scheduler discovered enough resources that it can schedule work?
+    boolean isInitialized();              // has scheduler read all it's config and set up its strucures?
+    boolean ready();                    // have enough resources checked in so scheduler can schedule work?
+
+    // once both initialized() and ready() occur, the RM scaffolding will enable scheduling by calling start
     void start();
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1461692&r1=1461691&r2=1461692&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Wed Mar 27 16:21:18 2013
@@ -39,8 +39,6 @@ import org.apache.uima.ducc.common.utils
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
 
-import com.google.gson.Gson;
-
 
 /**
  * This process orchestrates scheduling.
@@ -133,7 +131,7 @@ public class Scheduler
     {
     }
 
-    public void init()
+    public synchronized void init()
         throws Exception
     {
         String methodName = "init";
@@ -227,6 +225,11 @@ public class Scheduler
         initialized = true;
     }
 
+    public synchronized boolean isInitialized()
+    {
+        return initialized;
+    }
+
     public Machine getMachine(NodeIdentity ni)
     {
     	return nodepool.getMachine(ni);        
@@ -638,12 +641,12 @@ public class Scheduler
      * We don't accept new work or even Orchestrator state updates until "ready". We do
      * want machines, but be sure the internal structures are protected.
      */
-    public boolean ready()
+    public synchronized boolean ready()
     {
     	return stability;
     }
 
-    public void start()
+    public synchronized void start()
     {
         stability = true;
     }
@@ -652,7 +655,7 @@ public class Scheduler
     {
     	String methodName = "handleDeadNodes";
     	
-        if ( ! initialized ) {
+        if ( ! isInitialized() ) {
             return;
         }
 
@@ -696,7 +699,11 @@ public class Scheduler
 //             return null;
 //         }
 
-        if ( ! stability ) {
+        if ( ! ready() ) {
+            return null;
+        }
+
+        if ( ! isInitialized() ) {
             return null;
         }
 
@@ -890,18 +897,9 @@ public class Scheduler
     private int total_arrivals = 0;
     public void nodeArrives(Node node)
     {        
-    	String methodName = "nodeArrives";
+    	// String methodName = "nodeArrives";
         // The first block insures the node is in the scheduler's records as soon as possible
 
-        try {
-			Gson gson = new Gson();
-			String gnode = gson.toJson(node);
-			logger.info(methodName, null, "GSON:", gnode);
-		} catch (Exception e) {
-			// TODO Auto-generated catch block
-			logger.error(methodName, null, e);
-		}
-
         total_arrivals++;       // report these in the main schedule loop
         synchronized(this) {
             // the amount of memory available for shares, adjusted with configured overhead