You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/03/09 16:09:12 UTC
svn commit: r1454731 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm:
ServiceHandler.java ServiceManagerComponent.java ServiceSet.java
Author: challngr
Date: Sat Mar 9 15:09:12 2013
New Revision: 1454731
URL: http://svn.apache.org/r1454731
Log:
UIMA-2728
Fix race with Orchestrator state during service startup and CLI/API commands.
See Jira for details.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1454731&r1=1454730&r2=1454731&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java Sat Mar 9 15:09:12 2013
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.sm;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -57,15 +58,18 @@ public class ServiceHandler
private ServiceStateHandler serviceStateHandler = new ServiceStateHandler();
private ServiceMap serviceMap = new ServiceMap(); // note this is the sync object for publish
- private HashMap<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>();
- private HashMap<DuccId, IDuccWork> newServices = new HashMap<DuccId, IDuccWork>();
+ private Map<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>();
+ private Map<DuccId, IDuccWork> newServices = new HashMap<DuccId, IDuccWork>();
- private HashMap<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, IDuccWork>();
- private HashMap<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, IDuccWork>();
-
- private HashMap<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, IDuccWork>();
- private HashMap<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, IDuccWork>();
+ private Map<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, IDuccWork>();
+ private Map<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, IDuccWork>();
+ private Map<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, IDuccWork>();
+ private Map<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, IDuccWork>();
+
+ private List<ApiHandler> pendingRequests = new LinkedList<ApiHandler>();
+ private Object stateUpdateLock = new Object();
+
public ServiceHandler(IServiceManager serviceManager)
{
this.serviceManager = serviceManager;
@@ -82,6 +86,7 @@ public class ServiceHandler
}
try {
+ runCommands(); // enqueued orders that came in while I was away
processUpdates();
} catch (Throwable t) {
logger.error(methodName, null, t);
@@ -106,54 +111,48 @@ public class ServiceHandler
{
String methodName = "processUpdates";
logger.info(methodName, null, "Processing updates.");
- HashMap<DuccId, IDuccWork> incoming;
-
- incoming = new HashMap<DuccId, IDuccWork>();
- synchronized(deletedJobs) {
- incoming.putAll(deletedJobs);
- deletedJobs.clear();
- }
- handleDeletedJobs(incoming);
-
- incoming = new HashMap<DuccId, IDuccWork>();
- synchronized(modifiedJobs) {
- incoming.putAll(modifiedJobs);
- modifiedJobs.clear();
- }
- handleModifiedJobs(incoming);
-
- incoming = new HashMap<DuccId, IDuccWork>();
- synchronized(deletedServices) {
- incoming.putAll(deletedServices);
- deletedServices.clear();
- }
- handleDeletedServices(incoming);
-
- incoming = new HashMap<DuccId, IDuccWork>();
- synchronized(modifiedServices) {
- incoming.putAll(modifiedServices);
- modifiedServices.clear();
- }
- handleModifiedServices(incoming);
- handleImplicitServices();
-
- incoming = new HashMap<DuccId, IDuccWork>();
- synchronized(newServices) {
- incoming.putAll(newServices);
- newServices.clear();
+ Map<DuccId, IDuccWork> deletedJobsMap = new HashMap<DuccId, IDuccWork>();
+ Map<DuccId, IDuccWork> modifiedJobsMap = new HashMap<DuccId, IDuccWork>();
+ Map<DuccId, IDuccWork> newJobsMap = new HashMap<DuccId, IDuccWork>();
+ Map<DuccId, IDuccWork> deletedServicesMap = new HashMap<DuccId, IDuccWork>();
+ Map<DuccId, IDuccWork> modifiedServicesMap = new HashMap<DuccId, IDuccWork>();
+ Map<DuccId, IDuccWork> newServicesMap = new HashMap<DuccId, IDuccWork>();
+
+ synchronized(stateUpdateLock) {
+ deletedJobsMap.putAll(deletedJobs);
+ deletedJobs.clear();
+
+ modifiedJobsMap.putAll(modifiedJobs);
+ modifiedJobs.clear();
+
+ deletedServicesMap.putAll(deletedServices);
+ deletedServices.clear();
+
+ modifiedServicesMap.putAll(modifiedServices);
+ modifiedServices.clear();
+
+ newServicesMap.putAll(newServices);
+ newServices.clear();
+
+ newJobsMap.putAll(newJobs);
+ newJobs.clear();
}
- handleNewServices(incoming);
- incoming = new HashMap<DuccId, IDuccWork>();
- synchronized(newJobs) {
- incoming.putAll(newJobs);
- newJobs.clear();
- }
- handleNewJobs(incoming);
+ // We could potentially have several updates where a service or arrives, is modified, and then deleted, while
+ // we are busy. Need to handle them in the right order.
+ //
+ // Jobs are dependent on services but not the other way around - I think we need to handle services first,
+ // to avoid the case where something is dependent on something that will exist soon but doesn't currently.
+ handleNewServices (newServicesMap );
+ handleModifiedServices(modifiedServicesMap);
+ handleDeletedServices (deletedServicesMap );
+ handleImplicitServices( );
+
+ handleNewJobs (newJobsMap );
+ handleModifiedJobs (modifiedJobsMap );
+ handleDeletedJobs (deletedJobsMap );
- synchronized(serviceMap) {
- serviceManager.publish(serviceMap);
- }
+ serviceManager.publish(serviceMap);
List<ServiceSet> regsvcs = serviceStateHandler.getRegisteredServices();
for ( ServiceSet sset : regsvcs ) {
@@ -172,35 +171,40 @@ public class ServiceHandler
)
{
- synchronized(newJobs) {
- this.newJobs.putAll(newJobs);
- }
-
- synchronized(newServices) {
- this.newServices.putAll(newServices);
- }
-
- synchronized(deletedJobs) {
- this.deletedJobs.putAll(deletedJobs);
- }
-
- synchronized(deletedServices) {
+ synchronized(stateUpdateLock) {
+ this.newJobs.putAll(newJobs);
+ this.newServices.putAll(newServices);
+ this.deletedJobs.putAll(deletedJobs);
this.deletedServices.putAll(deletedServices);
- }
-
- synchronized(modifiedJobs) {
this.modifiedJobs.putAll(modifiedJobs);
- }
-
- synchronized(modifiedServices) {
this.modifiedServices.putAll(modifiedServices);
}
-
synchronized(this) {
notify();
}
}
-
+
+ void runCommands()
+ {
+ String methodName = "runCommands";
+ LinkedList<ApiHandler> tmp = new LinkedList<ApiHandler>();
+ synchronized(pendingRequests) {
+ tmp.addAll(pendingRequests);
+ pendingRequests.clear();
+ }
+ logger.info(methodName, null, "Running", tmp.size(), "API Tasks.");
+ for ( ApiHandler apih : tmp ) {
+ apih.run();
+ }
+ }
+
+ void addApiTask(ApiHandler apih)
+ {
+ synchronized(pendingRequests) {
+ pendingRequests.add(apih);
+ }
+ }
+
/**
* Resolves state for the job in id based on the what it is dependent upon - the independent services
*/
@@ -295,7 +299,7 @@ public class ServiceHandler
return jobServices;
}
- protected void handleNewJobs(HashMap<DuccId, IDuccWork> work)
+ protected void handleNewJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleNewJobs";
@@ -380,7 +384,7 @@ public class ServiceHandler
serviceStateHandler.removeServicesForJob(id);
}
- protected void handleDeletedJobs(HashMap<DuccId, IDuccWork> work)
+ protected void handleDeletedJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleCompletedJobs";
@@ -401,7 +405,7 @@ public class ServiceHandler
serviceMap.removeAll(work.keySet());
}
- protected void handleModifiedJobs(HashMap<DuccId, IDuccWork> work)
+ protected void handleModifiedJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleModifiedJobs";
@@ -433,7 +437,7 @@ public class ServiceHandler
}
- protected void handleNewServices(HashMap<DuccId, IDuccWork> work)
+ protected void handleNewServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleNewServices";
@@ -544,7 +548,7 @@ public class ServiceHandler
// We're here because we got OR state for the service that it has stopped running.
// Must clean up.
//
- protected void handleDeletedServices(HashMap<DuccId, IDuccWork> work)
+ protected void handleDeletedServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleDeletedServices";
@@ -595,7 +599,7 @@ public class ServiceHandler
}
}
- protected void handleModifiedServices(HashMap<DuccId, IDuccWork> work)
+ protected void handleModifiedServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleModifiedServices";
@@ -753,10 +757,13 @@ public class ServiceHandler
sset.getId());
}
- // only start something if we don't have enought already going
- ApiHandler apih = new ApiHandler(ev, this);
- Thread t = new Thread(apih);
- t.start();
+ pendingRequests.add(new ApiHandler(ev, this));
+
+// // only start something if we don't have enought already going
+// ApiHandler apih = new ApiHandler(ev, this);
+// Thread t = new Thread(apih);
+// t.start();
+
return new ServiceReplyEvent(ServiceCode.OK,
"Service " + serviceIdString + " start request accepted, new instances[" + wanted + "]",
sset.getKey(),
@@ -833,9 +840,10 @@ public class ServiceHandler
if ( tolose > 0 ) {
- ApiHandler apih = new ApiHandler(ev, this);
- Thread t = new Thread(apih);
- t.start();
+ pendingRequests.add(new ApiHandler(ev, this));
+// ApiHandler apih = new ApiHandler(ev, this);
+// Thread t = new Thread(apih);
+// t.start();
}
return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " stop request accepted for [" + tolose + "] instances.", sset.getKey(), sset.getId());
@@ -951,10 +959,10 @@ public class ServiceHandler
}
if ( sset.isRegistered() ) {
- ApiHandler apih = new ApiHandler(ev, this);
-
- Thread t = new Thread(apih);
- t.start();
+ pendingRequests.add(new ApiHandler(ev, this));
+// ApiHandler apih = new ApiHandler(ev, this);
+// Thread t = new Thread(apih);
+// t.start();
return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " modify request accepted.", sset.getKey(), sset.getId());
} else {
return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + friendly + " is not a known service.", sset.getKey(), null);
@@ -1005,9 +1013,10 @@ public class ServiceHandler
if ( sset.isRegistered() ) {
sset.deregister(); // just sets a flag so we know how to handle it when it starts to die
- ApiHandler apih = new ApiHandler(ev, this);
- Thread t = new Thread(apih);
- t.start();
+ pendingRequests.add(new ApiHandler(ev, this));
+// ApiHandler apih = new ApiHandler(ev, this);
+// Thread t = new Thread(apih);
+// t.start();
return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " unregistered. Shutting down implementors.", sset.getKey(), sset.getId());
} else {
return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + serviceIdString + " is not a registered service.", sset.getKey(), null);
@@ -1184,12 +1193,12 @@ public class ServiceHandler
{
// Map of active service descriptors by endpoint. For UIMA services, key is the endpoint.
- private HashMap<String, ServiceSet> servicesByName = new HashMap<String, ServiceSet>();
- private HashMap<Long, ServiceSet> servicesByFriendly = new HashMap<Long, ServiceSet>();
+ private Map<String, ServiceSet> servicesByName = new HashMap<String, ServiceSet>();
+ private Map<Long, ServiceSet> servicesByFriendly = new HashMap<Long, ServiceSet>();
// For each job, the collection of services it is dependent upon
// DUccId is a Job Id (or id for serice that has dependencies)
- private HashMap<DuccId, Map<String, ServiceSet>> servicesByJob = new HashMap<DuccId, Map<String, ServiceSet>>();
+ private Map<DuccId, Map<String, ServiceSet>> servicesByJob = new HashMap<DuccId, Map<String, ServiceSet>>();
ServiceStateHandler()
{
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java?rev=1454731&r1=1454730&r2=1454731&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java Sat Mar 9 15:09:12 2013
@@ -98,6 +98,8 @@ public class ServiceManagerComponent
private String service_seqno = "service.seqno";
private DuccIdFactory idFactory = new DuccIdFactory();
+ private boolean initialized = false;
+
public ServiceManagerComponent(CamelContext context)
{
super("ServiceManager", context);
@@ -212,6 +214,10 @@ public class ServiceManagerComponent
}
idFactory = new DuccIdFactory(seq);
+
+ synchronized(this) {
+ initialized = true;
+ }
}
@Override
@@ -327,6 +333,7 @@ public class ServiceManagerComponent
if ( w.getDuccType() == DuccType.Reservation ) continue;
if ( !((DuccWorkJob)w).isActive() ) continue; // not active, we don't care about it. likely after restart.
+
logger.debug(methodName, w.getDuccId(), "Reconciling, adding", w.getDuccType());
switch(w.getDuccType()) {
case Job:
@@ -337,8 +344,8 @@ public class ServiceManagerComponent
case Service:
localMap.addDuccWork(w);
// An arbitrary process is **almost** the same as a service in terms of how most of DUCC
- // handles it. In order to transparently reuse all that code it is classified as a
- // special type of service, "other", which the SM treats as a regular job.
+ // handles it. To me (SM), however, it is just like any other job so it goes into
+ // the job map.
switch ( ((IDuccWorkService)w).getServiceDeploymentType() )
{
case uima:
@@ -357,7 +364,7 @@ public class ServiceManagerComponent
}
}
- // Stuff on the left is stuff we have but or doesn't
+ // Stuff on the right is stuff we have but OR doesn't
work = diffmap.getRight();
for ( IDuccWork w : work.values() ) {
if ( w.getDuccType() == DuccType.Reservation ) continue;
@@ -388,34 +395,54 @@ public class ServiceManagerComponent
}
}
- // Now: stuff we both know about
+ // Now: stuff we both know about. Stuff on left is incoming.
for( DuccMapValueDifference<IDuccWork> jd: diffmap ) {
- IDuccWork r = jd.getRight();
IDuccWork l = jd.getLeft();
+ IDuccWork r = jd.getRight();
if ( l.getDuccType() == DuccType.Reservation ) continue;
logger.debug(methodName, l.getDuccId(), "Reconciling, incoming state = ", l.getStateObject(), " my state = ", r.getStateObject());
+ boolean active = ((DuccWorkJob)l).isActive();
// Update our own state by replacing the old (right) object with the new (left)
switch(l.getDuccType()) {
case Job:
- modifiedJobs.put(l.getDuccId(), l);
- localMap.addDuccWork(l);
+ if ( active ) {
+ modifiedJobs.put(l.getDuccId(), l);
+ localMap.addDuccWork(l);
+ } else {
+ deletedJobs.put(l.getDuccId(), l);
+ localMap.removeDuccWork(l.getDuccId());
+ }
break;
case Service:
- localMap.addDuccWork(l);
- switch ( ((IDuccWorkService)l).getServiceDeploymentType() )
- {
- case uima:
- case custom:
- modifiedServices.put(l.getDuccId(), l);
- break;
- case other:
- modifiedJobs.put(l.getDuccId(), l);
- break;
+ if ( active ) {
+ localMap.addDuccWork(l);
+ switch ( ((IDuccWorkService)l).getServiceDeploymentType() )
+ {
+ case uima:
+ case custom:
+ modifiedServices.put(l.getDuccId(), l);
+ break;
+ case other:
+ modifiedJobs.put(l.getDuccId(), l);
+ break;
+ }
+ } else {
+ localMap.removeDuccWork(l.getDuccId());
+ switch ( ((IDuccWorkService)l).getServiceDeploymentType() )
+ {
+ case uima:
+ case custom:
+ deletedServices.put(l.getDuccId(), l);
+ break;
+ case other:
+ deletedJobs.put(l.getDuccId(), l);
+ break;
+ }
}
break;
@@ -658,6 +685,12 @@ public class ServiceManagerComponent
public synchronized void orchestratorStateArrives(DuccWorkMap map)
{
+ String methodName = "orchestratorStateArrives";
+ if ( ! initialized ) {
+ logger.info(methodName, null, "SM not initialized, ignoring Orchestrator message.");
+ return;
+ }
+
epochCounter++;
incomingMap = map;
notify();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1454731&r1=1454730&r2=1454731&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Sat Mar 9 15:09:12 2013
@@ -71,7 +71,7 @@ public class ServiceSet
// key is job/service id, value is same. it's a map for fast existence check
HashMap<DuccId, DuccId> references = new HashMap<DuccId, DuccId>();
- // For a registered service, here is my registered it
+ // For a registered service, here is my registered id
DuccId id;
HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>();
@@ -416,14 +416,17 @@ public class ServiceSet
*/
void enforceAutostart()
{
+ String methodName = "enforceAutostart";
if ( ! autostart ) return; // not doing auto, nothing to do
if ( stopped ) return; // doing auto, but we've been manually stopped
// could have more implementors than instances if some were started dynamically but the count not persisted
- int needed = Math.max(0, instances - implementors.size());
+ int needed = Math.max(0, instances - friendly_ids.size());
+ logger.debug(methodName, null, "ENFORCE: ", needed);
while ( (needed--) > 0 ) {
start();
+ logger.debug(methodName, null, "ENFORCE: ", needed);
}
}
@@ -1064,6 +1067,7 @@ public class ServiceSet
{
String methodName = "start";
+ logger.debug(methodName, null, "START START START START START START START START");
// if ( service_type == ServiceType.Custom ) {
// establish();
// return;
@@ -1087,7 +1091,13 @@ public class ServiceSet
};
for ( int i = 0; i < args.length; i++ ) {
- logger.debug(methodName, null, "Args[", i, "]:", args[i]);
+ if ( i > 0 && (args[i-1].equals("-cp") ) ) {
+ // The classpaths can be just awful filling the logs with junk. It will end up in the agent log
+ // anyway so let's inhibit it here.
+ logger.debug(methodName, null, "Args[", i, "]: <CLASSPATH>");
+ } else {
+ logger.debug(methodName, null, "Args[", i, "]:", args[i]);
+ }
}
ProcessBuilder pb = new ProcessBuilder(args);
@@ -1124,8 +1134,19 @@ public class ServiceSet
}
// That was annoying. Now search the lines for some hint of the id.
+ boolean inhibit_cp = false;
for ( String s : stdout_lines ) {
- logger.debug(methodName, id, "Start stdout:", s);
+ if ( inhibit_cp ) {
+ // The classpaths can be just awful filling the logs with junk. It will end up in the agent log
+ // anyway so let's inhibit it here.
+ logger.debug(methodName, id, "<INHIBITED CP>");
+ inhibit_cp = false;
+ } else {
+ logger.debug(methodName, id, "Start stdout:", s);
+ if ( s.indexOf("-cp") >= 0 ) {
+ inhibit_cp = true;
+ }
+ }
}
for ( String s : stderr_lines ) {
@@ -1160,6 +1181,7 @@ public class ServiceSet
setServiceState(ServiceState.Initializing);
}
saveMetaProperties();
+ logger.debug(methodName, null, "ENDSTART ENDSTART ENDSTART ENDSTART ENDSTART ENDSTART");
}
/**