You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2014/02/03 20:35:49 UTC
svn commit: r1564022 [2/5] - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/
uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/
uima-ducc-examples/src/main/resources/org/apache/uima/ducc/test/servi...
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1564022&r1=1564021&r2=1564022&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java Mon Feb 3 19:35:48 2014
@@ -36,8 +36,6 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.ServiceStopEvent;
import org.apache.uima.ducc.transport.event.ServiceUnregisterEvent;
import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
-import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
-import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.sm.IServiceDescription;
import org.apache.uima.ducc.transport.event.sm.ServiceDependency;
@@ -45,6 +43,7 @@ import org.apache.uima.ducc.transport.ev
+@SuppressWarnings("serial")
public class ServiceHandler
implements SmConstants,
Runnable
@@ -52,7 +51,6 @@ public class ServiceHandler
/**
*
*/
- private static final long serialVersionUID = 1L;
private DuccLogger logger = DuccLogger.getLogger(ServiceHandler.class.getName(), COMPONENT_NAME);
private IServiceManager serviceManager;
@@ -94,17 +92,26 @@ public class ServiceHandler
}
}
}
-
/**
* At boot only ... pass in the set of all known active services to each service so it can update
* internal state with current published state.
*/
- void synchronizeImplementors(Map<DuccId, JobState> servicemap)
+ void bootImplementors(Map<DuccId, DuccWorkJob> incoming)
{
- ArrayList<String> keys = serviceStateHandler.getServiceNames();
- for ( String k : keys ) {
- ServiceSet sset = serviceStateHandler.getServiceByName(k);
- sset.synchronizeImplementors(servicemap);
+ for ( DuccId id : incoming.keySet() ) {
+ DuccWorkJob j = incoming.get(id);
+ String ep = j.getServiceEndpoint();
+ ServiceSet sset = serviceStateHandler.getServiceByUrl(ep);
+ if ( sset == null ) {
+ // must cancel this service, no idea what it is
+ } else {
+ sset.bootImplementor(id, j.getJobState()); // boot by id, job, not known so more stuff
+ // has to be built up
+ }
+ }
+ List<ServiceSet> services = serviceStateHandler.getServices();
+ for ( ServiceSet sset : services ) {
+ sset.bootComplete();
}
}
@@ -147,18 +154,17 @@ public class ServiceHandler
handleNewServices (newServicesMap );
handleModifiedServices(modifiedServicesMap);
handleDeletedServices (deletedServicesMap );
- handleImplicitServices( );
handleNewJobs (newJobsMap );
handleModifiedJobs (modifiedJobsMap );
handleDeletedJobs (deletedJobsMap );
- serviceManager.publish(serviceMap);
-
- List<ServiceSet> regsvcs = serviceStateHandler.getRegisteredServices();
+ List<ServiceSet> regsvcs = serviceStateHandler.getServices();
for ( ServiceSet sset : regsvcs ) {
sset.enforceAutostart();
}
+
+ serviceManager.publish(serviceMap);
}
void signalUpdates( // This is the incoming or map, with work split into categories.
@@ -207,39 +213,12 @@ public class ServiceHandler
}
/**
- * Resolves state for the job in id based on the what it is dependent upon - the independent services
- */
- protected void resolveState(DuccId id, ServiceDependency dep)
- {
- String methodName = "resolveState";
- Map<String, ServiceSet> services = serviceStateHandler.getServicesForJob(id);
- if ( services == null ) {
- dep.setState(ServiceState.NotAvailable); // says that nothing i need is available
- return;
- }
-
- ServiceState state = ServiceState.Available;
- //
- // Start with the most permissive state and reduce it as we walk the list
- // Running > Initializing > Waiting > NotAvailable
- //
- // This sets the state to the min(all dependent service states)
- //
- for ( ServiceSet sset : services.values() ) {
- if ( sset.getServiceState().ordinality() < state.ordinality() ) state = sset.getServiceState();
- dep.setIndividualState(sset.getKey(), sset.getServiceState());
- logger.debug(methodName, id, "Set individual state", sset.getServiceState());
- }
- dep.setState(state);
- }
-
- /**
* This is called when an endpoint is referenced as a dependent service from a job or a service.
* It is called only when a new job or service is first discovred in the OR map.
*/
protected Map<String, ServiceSet> resolveDependencies(DuccWorkJob w, ServiceDependency s)
{
- String methodName = "resolveDependencies";
+ //String methodName = "resolveDependencies";
DuccId id = w.getDuccId();
String[] deps = w.getServiceDependencies();
@@ -249,100 +228,73 @@ public class ServiceHandler
for ( String dep : deps ) {
// put it into the global map of known services if needed and up the ref count
- ServiceSet sset = serviceStateHandler.getServiceByName(dep);
- if ( sset == null ) { // first time, so it's by reference only
- try {
- sset = new ServiceSet(dep, serviceManager.newId());
- serviceStateHandler.putServiceByName(dep, sset);
- } catch ( Exception e ) { // if 'dep' is invalid, or we can't get a duccid, we throw
- s.addMessage(dep, e.getMessage());
+ ServiceSet sset = serviceStateHandler.getServiceByUrl(dep);
+ if ( sset == null ) {
+
+ // Not good. Lets see if it's a terminating service so we can at least tell the poor guy.
+ sset = serviceStateHandler.getUnregisteredServiceByUrl(dep);
+ if ( sset == null ) {
+ // Still null, never h'oid of de guy
+ s.addMessage(dep, "Independent registered service [" + dep + "] is unknown.");
+ s.setState(ServiceState.NotAvailable);
+ } else {
+ // The service is deregistered but not yet purged, may as well tell him. It can
+ // take a while for these guys to go away.
+ s.addMessage(dep, "Independent registered service [" + dep + "] has been deregistered and is terminating.");
s.setState(ServiceState.NotAvailable);
- fatal = true;
- continue;
}
- }
-
- if ( sset.isDeregistered() ) {
- // Registerered services only - the service might even still be alive because it can
- // take a while to get rid of these guys - we need to be sure we don't attach any
- // new jobs to it.
- s.addMessage(dep, "Independent registered service [" + dep + "] has been deregistered and is terminating.");
- s.setState(ServiceState.NotAvailable);
fatal = true;
continue;
}
- //
- // We try to vet all services so the message is complete. If we've already had some fatal problems
- // we need to bypass any attempt to cope with registered services or updating the sset.
- //
- if ( ! fatal ) {
- if ( sset.isRegistered() && (sset.countImplementors() == 0) && sset.isStartable() ) {
- // Registered but not alive, well, we can fix that!
- int ninstances = sset.getNInstances();
- logger.debug(methodName, sset.getId(), "Reference-starting registered service, instances =", ninstances);
- if ( ! sset.isAutostart() ) { // must avoid races ith autostart for referenced start
- sset.setReferencedStart(true);
- }
- for ( int i = 0; i < ninstances; i++ ) {
- if ( ! sset.start() ) {
- s.addMessage(dep, "Can't start independent service.");
- s.setState(ServiceState.NotAvailable);
- break;
- }
- }
- }
-
- jobServices.put(dep, sset);
- sset.reference(id);
- serviceStateHandler.putServiceForJob(w.getDuccId(), sset);
- logger.debug(methodName, id, "Service init ok. Ref[", dep, "] incr to", sset.countReferences());
- }
+ jobServices.put(dep, sset);
}
if ( fatal ) {
jobServices.clear();
+ } else {
+ for ( ServiceSet sset : jobServices.values() ) {
+ // If service is unregistered and then rerigistered while the job is running it may have lost
+ // its connections, which we insure we always have here.
+ if ( serviceStateHandler.getServicesForJob(w.getDuccId()) == null ) {
+ sset.reference(id); // might start it if it's not running
+ serviceStateHandler.putServiceForJob(w.getDuccId(), sset);
+ }
+ }
}
return jobServices;
}
-
- protected void handleNewJobs(Map<DuccId, IDuccWork> work)
- {
- String methodName = "handleNewJobs";
-
- // Map of updates to send to OR
- HashMap<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>();
-
- for ( DuccId id : work.keySet() ) {
- DuccWorkJob w = (DuccWorkJob) work.get(id);
-
- if ( !w.isActive() ) {
- logger.info(methodName, id, "Bypassing inactive job, state =", w.getStateObject());
- continue;
- }
- ServiceDependency s = new ServiceDependency(); // for the OR
- updates.put(id, s);
-
- String[] deps = w.getServiceDependencies();
- if ( deps == null ) { // no deps, just mark it running and move on
- s.setState(ServiceState.Available);
- logger.info(methodName, id, "Added to map, no service dependencies.");
- continue;
- }
-
- //
- // Get dependency references and fire up their state machines
- //
- Map<String, ServiceSet> jobServices = resolveDependencies(w, s);
- for ( ServiceSet sset : jobServices.values() ) {
- sset.establish();
- }
- resolveState(id, s);
- logger.info(methodName, id, "Added job to map, with service dependency state.", s.getState());
+ /**
+ * Resolves state for the job in id based on the what it is dependent upon - the independent services
+ *
+ * Enter this code ONLY if it is determined that the 'independent' work, 'id', does in fact have
+ * declared dependencies.
+ *
+ * @param id This is the ID of a job or service we want to work out the service state for
+ * @param dep This is the thing we send to OR telling it about the state of 'id'
+ */
+ protected void resolveState(DuccId id, ServiceDependency dep)
+ {
+ Map<Long, ServiceSet> services = serviceStateHandler.getServicesForJob(id);
+ if ( services == null ) {
+ dep.setState(ServiceState.NotAvailable); // says that nothing i need is available
+ return;
}
- serviceMap.putAll(updates);
+ ServiceState state = ServiceState.Available;
+ //
+ // Start with the most permissive state and reduce it as we walk the list
+ // Running > Initializing > Waiting > NotAvailable
+ //
+ // This sets the state to the min(all dependent service states)
+ //
+ for ( ServiceSet sset : services.values() ) {
+ if ( sset.getState().ordinality() < state.ordinality() ) state = sset.getState();
+ dep.setIndividualState(sset.getKey(), sset.getState());
+ // logger.debug(methodName, id, "Set individual state", sset.getState());
+ }
+ dep.setState(state);
}
/**
@@ -354,9 +306,9 @@ public class ServiceHandler
{
String methodName = "stopDependentServices";
- Map<String, ServiceSet> deps = serviceStateHandler.getServicesForJob(id);
+ Map<Long, ServiceSet> deps = serviceStateHandler.getServicesForJob(id);
if ( deps == null ) {
- logger.debug(methodName, id, "No dependent services to stop, returning.");
+ logger.info(methodName, id, "No dependent services to stop, returning.");
return; // service already deleted, timing issue
}
@@ -364,61 +316,64 @@ public class ServiceHandler
// Bop through all the things job 'id' is dependent upon, and update their refcounts. If
// the refs go to 0 we stop the pinger and sometimes the independent service itself.
//
- for ( String dep : deps.keySet() ) {
- logger.debug(methodName, id, "Looking up service", dep);
+ for ( Long depid : deps.keySet() ) {
+ logger.debug(methodName, id, "Looking up service", depid);
- ServiceSet sset = deps.get(dep);
+ ServiceSet sset = deps.get(depid);
if ( sset == null ) {
- throw new IllegalStateException("Null service for " + dep); // sanity check, should never happen
+ logger.error(methodName, id, "Internal error: Null service for " + depid); // sanity check, should never happen
+ continue;
}
- int count = sset.dereference(id); // also maybe stops the pinger
- logger.debug(methodName, id, "Ref count for", sset.getKey(), "goes down to", count);
- if ( count == 0 ) {
- if ( sset.isImplicit() ) {
- logger.debug(methodName, id, "Removing unreferenced implicit service", dep, "refcount", count);
- serviceStateHandler.removeService(dep);
- }
- if ( sset.isRegistered() && sset.isReferencedStart() ) {
- if ( sset.isAutostart() ) { // could have happened after the reference
- sset.setReferencedStart(false); // so we don't linger, just reset
- } else {
- logger.debug(methodName, id, "Stopping reference-started service", dep, "refcount", count);
- sset.lingeringStop();
- }
- }
+ sset.dereference(id); // also maybe stops the pinger
- }
}
- // last, indicate that job 'id' has nothing its dependent upon any more
+ // last, indicate that job 'id' has nothing it's dependent upon any more
serviceStateHandler.removeServicesForJob(id);
}
+
+ protected void handleNewJobs(Map<DuccId, IDuccWork> work)
+ {
+ String methodName = "handleNewJobs";
- protected void handleDeletedJobs(Map<DuccId, IDuccWork> work)
- {
- String methodName = "handleCompletedJobs";
+ // Map of updates to send to OR
+ HashMap<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>();
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
-
+
+ if ( !w.isActive() ) {
+ logger.info(methodName, id, "Bypassing inactive job, state =", w.getStateObject());
+ continue;
+ }
+
+ ServiceDependency s = new ServiceDependency(); // for the OR
+ updates.put(id, s);
+
String[] deps = w.getServiceDependencies();
if ( deps == null ) { // no deps, just mark it running and move on
- logger.info(methodName, id, "No service dependencies, no updates made.");
+ s.setState(ServiceState.Available);
+ logger.info(methodName, id, "Added to map, no service dependencies.");
continue;
}
- stopDependentServices(id);
+ Map<String, ServiceSet> jobServices = resolveDependencies(w, s);
+ for ( ServiceSet sset : jobServices.values() ) {
+ logger.info(methodName, id, "Job is dependent on", sset.getKey());
+ }
- logger.info(methodName, id, "Deleted job from map");
+ resolveState(id, s);
+ logger.info(methodName, id, "Added job to map, with service dependency state.", s.getState());
}
- serviceMap.removeAll(work.keySet());
+ serviceMap.putAll(updates);
+
}
protected void handleModifiedJobs(Map<DuccId, IDuccWork> work)
{
- String methodName = "handleModifiedJobs";
+ String methodName = "handleModifiedobs";
//
// Only look at active jobs. The others will be going away soon and we use
@@ -442,24 +397,46 @@ public class ServiceHandler
s.setState(ServiceState.NotAvailable);
s.clearMessages();
} else if ( j.isActive() ) {
+ resolveDependencies(j, s);
resolveState(id, s);
}
}
}
+ protected void handleDeletedJobs(Map<DuccId, IDuccWork> work)
+ {
+ String methodName = "handleDeletedobs";
+
+ for ( DuccId id : work.keySet() ) {
+ DuccWorkJob w = (DuccWorkJob) work.get(id);
+
+ String[] deps = w.getServiceDependencies();
+ if ( deps == null ) { // no deps, just mark it running and move on
+ logger.info(methodName, id, "No service dependencies, no updates made.");
+ continue;
+ }
+
+ stopDependentServices(id);
+
+ logger.info(methodName, id, "Deleted job from map");
+ }
+
+ serviceMap.removeAll(work.keySet());
+
+ }
+
protected void handleNewServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleNewServices";
- Map<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>(); // to be added to the service map sent to OR
- Map<String, ServiceSet> newservices = new HashMap<String, ServiceSet>(); // to be added to our internal maps in serviceState
+ Map<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>(); // to be added to the service map sent to OR
+
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
//
// On restart we sometimes get stale stuff that we just ignore.
- // What else? Is the the right thing to do?
//
if ( !w.isActive() ) {
logger.info(methodName, id, "Bypassing inactive service, state=", w.getStateObject());
@@ -479,59 +456,12 @@ public class ServiceHandler
}
String[] deps = w.getServiceDependencies(); // other services this svc depends on
- ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);
+ ServiceSet sset = serviceStateHandler.getServiceByImplementor(id.getFriendly());
if ( sset == null ) {
- // submitted, we just track but not much else
- try {
- sset = new ServiceSet(serviceManager.newId(), id, endpoint, deps); // creates a "submitted" service
- sset.addImplementor(id, w.getJobState());
- serviceStateHandler.putServiceByName(endpoint, sset);
- } catch ( Exception e ) {
- s.addMessage(endpoint, e.getMessage());
- s.setState(ServiceState.NotAvailable);
- continue;
- }
- } else if ( sset.isDeregistered() ) {
- s.addMessage(endpoint, "Duplicate endpoint: terminating deregistered service.");
+ s.addMessage(endpoint, "No registered service for " + endpoint);
s.setState(ServiceState.NotAvailable);
continue;
- } else if ( sset.matches(id) ) {
- // TODO: not clear we have to do anything here since establish() below will
- // add to the implementors. Be sure to update the check so the
- // code in the following 'else' clause is executed correctly though.
-
- // and instance/implementor of our own registered services
- sset.addImplementor(id, w.getJobState());
- } else {
- //
- // If the new service is not a registered service, and it is a duplicate of another service
- // which isn't registered, we allow it to join the party.
- //
- // When it joins, it needs to "propmote" the ServiceSet to "Submitted".
- //
- // a) in the case of "implicit" we don't know enough to many any moral judgements at all
- // b) in the case of "submitted" it could be the user is increasing the pool of servers by
- // submitting more jobs. Perhaps we would better handle this via modify but for the moment,
- // just allow it.
- // c) in the case of "registered" we know and manage everything and don't allow it. users must
- // use the services modify api to increase or decrease instances.
- //
-
- if ( !sset.isRegistered() ) {
- sset.addImplementor(id, w.getJobState());
- sset.promote(); // we'll do this explicitly as a reminder that it's happening and
- // to insure we NEVER promote a registered service (which is actually
- // a demotion!).
- } else {
- String msg = "Duplicate endpoint: Registered service.";
- logger.warn(methodName, id, msg);
- s.addMessage(endpoint, msg);
- s.setState(ServiceState.NotAvailable);
- continue;
- }
- }
-
- // The service is new and unique if we get this far
+ }
//
// No deps. Put it in the map and move on.
@@ -539,180 +469,106 @@ public class ServiceHandler
if ( deps == null ) {
logger.info(methodName, id, "Added service to map, no service dependencies. ");
s.setState(ServiceState.Available); // good to go in the OR (the state of things i'm dependent upon)
- sset.establish(id, w.getJobState()); // sets my own state based entirely on state of w
+ sset.signalUpdate(w);
continue;
}
- Map<String, ServiceSet> jobServices = resolveDependencies(w, s); //
- for ( ServiceSet depset : jobServices.values() ) {
- depset.establish();
- }
- resolveState(id, s);
- sset.establish(id, w.getJobState());
+ resolveDependencies(w, s); // check what I depend on and maybe kick 'em
+ resolveState(id, s); // get cumulative state based on my deps
+
+ sset.signalUpdate(w); // kick my own instance
logger.info(methodName, id, "Added to map, with service dependencies,", s.getState());
}
- serviceStateHandler.recordNewServices(newservices);
- serviceMap.putAll(updates);
+ serviceMap.putAll(updates); // for return to OR
}
- //
- // We're here because we got OR state for the service that it has stopped running.
- // Must clean up.
- //
- protected void handleDeletedServices(Map<DuccId, IDuccWork> work)
- {
- String methodName = "handleDeletedServices";
-
- for ( DuccId id : work.keySet() ) {
- DuccWorkJob w = (DuccWorkJob) work.get(id);
- String endpoint = w.getServiceEndpoint();
- logger.info(methodName, id, "Deleted service:", endpoint);
-
- //
- // Dereference and maybe stop the services I'm dependent upon
- //
- if ( w.getServiceDependencies() == null ) {
- logger.info(methodName, id, "No service dependencies to update on removal.");
- } else {
- stopDependentServices(id); // update references, remove implicit services if any
- }
-
- if (endpoint == null ) { // probably impossible but lets not chance NPE
- logger.warn(methodName, id, "Missing service endpoint, ignoring.");
- continue;
- }
- ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);
-
- // may have been removed already if we saw it go to complet[ed/ing] and it lingered a while anyway, which is usual
- if ( sset != null ) {
- sset.removeImplementor(id); // also stops the ping thread if it's the last one
- }
- }
-
- //serviceStateHandler.removeServicesForJobs(work.keySet()); // services we were dependent upon
- serviceMap.removeAll(work.keySet()); // and finally the deleted services
- // from the published map
- }
-
/**
- * The pinger may have died while we weren't looking. Registered services take care
- * of themselves from handleModifiedServices, but we know very little about implicit
- * services so we walk them all and make their ServiceSet keep them clean.
+ * The assumption here is that we already had the service instance in our map, and OR is
+ * delivering an update. That means the instance was known to us in the past, it is not new.
*/
- protected void handleImplicitServices()
- {
- ArrayList<String> keys = serviceStateHandler.getServiceNames();
- for ( String k : keys ) {
- ServiceSet sset = serviceStateHandler.getServiceByName(k);
- if ( sset.isImplicit() ) {
- sset.establish();
- }
- }
- }
-
protected void handleModifiedServices(Map<DuccId, IDuccWork> work)
{
- String methodName = "handleModifiedServices";
+ String methodName = "handleModifiedServices";
//
// This is a specific service process, but not necessarily the whole service.
//
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
- String endpoint = w.getServiceEndpoint();
+ String url = w.getServiceEndpoint();
- if (endpoint == null ) { // probably impossible but lets not chance NPE
- logger.info(methodName, id, "Missing service endpoint, ignoring.");
+ if (url == null ) { // probably impossible but lets not chance NPE
+ logger.warn(methodName, id, "Missing service endpoint/url, ignoring.");
continue;
}
- ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);
+ ServiceSet sset = serviceStateHandler.getServiceByImplementor(id.getFriendly());
if ( sset == null ) {
- // may have already died and this is just leftover OR publications.
- if ( w.isActive() ) { // or maybe we just screwed up!
- logger.info(methodName, id, "Got update for active service instance", id.toString(), "but no ServiceSet! Job state:", w.getJobState());
+ sset = serviceStateHandler.getUnregisteredServiceByUrl(url);
+ if ( sset == null ) {
+ // leftover junk publication maybe? can't tell
+ logger.info(methodName, id, "Update for active service instance", id.toString(),
+ "but have no registration for it. Job state:", w.getJobState());
continue;
}
- continue;
+ logger.info(methodName, id, "Update for unregistered service, continuing shutdown of service. Job State:", w.getJobState());
}
if ( !sset.containsImplementor(id) ) {
- logger.info(methodName, id, "Bypassing removed service instance for", endpoint);
- continue;
+ if ( !sset.canDeleteInstance(w) ) {
+ // the instance isn't dead, this is a possible problem
+ logger.warn(methodName, id, "sset for", sset.getId(), "does not contain instance");
+ }
+ continue; // we don't care any more, he's gone
}
ServiceDependency s = serviceMap.get(id);
if ( w.isFinished() ) { // nothing more, just dereference and maybe stop stuff I'm dependent upon
+ // state Completing or Completed
stopDependentServices(id);
s.setState(ServiceState.NotAvailable); // tell orchestrator
- } else if ( w.getServiceDependencies() != null ) { // update state from things I'm dependent upon
+ } else if ( w.getServiceDependencies() != null ) { // update state from things I'm dependent upon
+ resolveDependencies(w, s);
resolveState(id, s);
- }
+ }
- // See what happened to the instance ...
- if ( w.isActive() ) {
- // Hard to know for sure, if there are a bunch of instances, some working and some not, how to manage this.
- // But this is a state *change* of something, and the something is active, so probably the service is OK now
- // if it hadn't been before.
-
- // Need to be cautious here - this will get reset if ANYthing is running. So we could have a bunch
- // of live instances and some new ones, where the live ones are ok but for some reason we can't start
- // new ones, in which case this gets set too often.
- //
- // This seems like it would be rare and since we aren't actually pounding restarts (only attempts every
- // SM cycle) maybe its ok. The alternative is to track state changes which is added complexity - for
- // waht gain, we need to determine with experience.
- //
- // I suppose the ServiceManagerHandler could easily track the per-process state change - we'd have to
- // modify the thing in the map it passes in to show 'before' and 'after' states instead of just passing
- // in the DuccWork thing.
- //
- JobState state = w.getJobState();
- if ( state == JobState.Running ) { // only if we confirm it's alive
- sset.resetRunFailures();
- }
+ sset.signalUpdate(w);
+ }
+
+ }
+
+ protected void handleDeletedServices(Map<DuccId, IDuccWork> work)
+
+ {
+ String methodName = "handleDeletedServices";
+
+ for ( DuccId id : work.keySet() ) {
+ DuccWorkJob w = (DuccWorkJob) work.get(id);
+ String url = w.getServiceEndpoint();
+ logger.info(methodName, id, "Instance deleted for", url);
+
+ if (url == null ) { // probably impossible but lets not chance NPE
+ logger.warn(methodName, id, "Missing service endpoint, ignoring.");
+ continue;
+ }
+
+ //
+ // Dereference and maybe stop the services I'm dependent upon
+ //
+ if ( w.getServiceDependencies() == null ) {
+ logger.info(methodName, id, "No service dependencies to update on removal.");
} else {
- JobState state = w.getJobState();
-
- if ( state == JobState.Completed ) {
- sset.removeImplementor(id);
- JobCompletionType jct = w.getCompletionType();
-
- logger.info(methodName, id, "Removing stopped instance from maps: state[", state, "] completion[", jct, "]");
- switch ( jct ) {
- case EndOfJob:
- case CanceledByUser:
- case CanceledByAdministrator:
- case Undefined:
- break;
- default:
- logger.debug(methodName, id, "RECORDING FAILURE");
- // all other cases are errors that contribute to the error count
- if ( sset.excessiveRunFailures() ) { // if true, the count is exceeeded, but reset
- logger.warn(methodName, null, "Process Failure: " + jct + " Maximum consecutive failures[" + sset.failure_run + "] max [" + sset.failure_max + "]");
- } else {
- sset.start();
- }
- break;
- }
- }
+ stopDependentServices(id); // update references, remove implicit services if any
}
- // Now factor in cumulative state of the implementors and manage the ping thread as needed
- sset.establish(id, w.getJobState());
-
- if ( (sset.getServiceState() == ServiceState.NotAvailable) && (sset.countReferences() == 0) && (sset.countImplementors() == 0) ) {
- // this service is now toast. remove from our maps asap to avoid clashes if it gets
- // resubmitted before the OR can purge it.
- if ( ! sset.isRegistered() ) {
- logger.debug(methodName, id, "Removing service", endpoint, "because it died and has no more references.");
- serviceStateHandler.removeService(endpoint);
- }
- serviceStateHandler.removeServicesForJob(id);
+ ServiceSet sset = serviceStateHandler.getServiceByImplementor(id.getFriendly());
+ if ( sset != null ) { // can happen on unregister
+ sset.signalUpdate(w);
}
}
-
+
+ serviceMap.removeAll(work.keySet()); // and finally the deleted services
}
/**
@@ -720,63 +576,41 @@ public class ServiceHandler
*/
void updateServiceQuery(IServiceDescription sd, ServiceSet sset)
{
-
- if ( sset.isRegistered() ) {
- //
- // The thing may not be running yet / at-all. Pull out the deps from the registration and
- // query them individually.
- //
- String[] deps = sset.getIndependentServices();
- if ( deps != null ) {
- for ( String dep : deps ) {
- ServiceSet independent = serviceStateHandler.getServiceByName(dep);
- if ( independent != null ) {
- sd.addDependency(dep, independent.getServiceState().decode());
- } else {
- sd.addDependency(dep, ServiceState.NotAvailable.decode());
- }
+ //
+ // The thing may not be running yet / at-all. Pull out the deps from the registration and
+ // query them individually.
+ //
+ String[] deps = sset.getIndependentServices();
+ if ( deps != null ) {
+ for ( String dep : deps ) {
+ ServiceSet independent = serviceStateHandler.getServiceByUrl(dep);
+ if ( independent != null ) {
+ sd.addDependency(dep, independent.getState().decode());
+ } else {
+ sd.addDependency(dep, ServiceState.Stopped.decode());
}
}
- } else {
- //
- // If it's not registered we have to look up all the dependencies of the implementors instead
- //
- Map<DuccId, JobState> implementors = sset.getImplementors();
- for ( DuccId id : implementors.keySet() ) {
- Map<String, ServiceSet> deps = serviceStateHandler.getServicesForJob(id); // all the stuff 'id' is dependent upon
- if ( deps != null ) {
- for ( String s : deps.keySet() ) {
- ServiceSet depsvc = deps.get(s);
- sd.addDependency(s, depsvc.getServiceState().decode());
- }
- }
- }
}
}
ServiceQueryReplyEvent query(ServiceQueryEvent ev)
{
//String methodName = "query";
- long friendly = ev.getFriendly();
- String epname = ev.getEndpoint();
+ long id = ev.getFriendly();
+ String url = ev.getEndpoint();
ServiceQueryReplyEvent reply = new ServiceQueryReplyEvent();
- if (( friendly == -1) && ( epname == null )) {
- ArrayList<String> keys = serviceStateHandler.getServiceNames();
- for ( String k : keys ) {
- ServiceSet sset = serviceStateHandler.getServiceByName(k);
- if ( k == null ) continue; // the unlikely event it changed out from under us
-
+ if (( id == -1) && ( url == null )) {
+ for ( ServiceSet sset : serviceStateHandler.getServices()) {
IServiceDescription sd = sset.query();
updateServiceQuery(sd, sset);
reply.addService(sd);
}
} else {
- ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+ ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
- reply.setMessage("Unknown");
- reply.setEndpoint(epname);
- reply.setId(friendly);
+ reply.setMessage("Unknown service: ID[" + id + "] Endpoint[" + url + "]");
+ reply.setEndpoint(url);
reply.setReturnCode(false);
} else {
IServiceDescription sd = sset.query();
@@ -792,52 +626,44 @@ public class ServiceHandler
{
//String methodName = "start";
- long friendly = ev.getFriendly();
- String epname = ev.getEndpoint();
- ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+ long id = ev.getFriendly();
+ String url = ev.getEndpoint();
+ String serviceIdString = extractId(id, url);
+ ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
- return new ServiceReplyEvent(false, "Unknown", epname, friendly);
+ return new ServiceReplyEvent(false, "Service " + serviceIdString + " does not exist.", null, id);
}
- String userin = ev.getUser();
+ String userin = ev.getUser();
String userout = sset.getUser();
if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
- return new ServiceReplyEvent(false, "Owned by " + userout, epname, friendly);
+ return new ServiceReplyEvent(false, "Service " + serviceIdString + " Start declined: not owner.", serviceIdString, id);
}
- if ( sset.isRegistered() ) {
- int running = sset.countImplementors();
- int instances = ev.getInstances();
- int registered = sset.getNInstances();
- int wanted = 0;
-
- if ( instances == -1 ) {
- wanted = Math.max(0, registered - running);
- } else {
- wanted = instances;
- }
- if ( wanted == 0 ) {
- return new ServiceReplyEvent(true,
- "Already has instances[" + running + "] - no additional instances started",
- sset.getKey(),
- sset.getId().getFriendly());
- }
-
- pendingRequests.add(new ApiHandler(ev, this));
-
-// // only start something if we don't have enought already going
-// ApiHandler apih = new ApiHandler(ev, this);
-// Thread t = new Thread(apih);
-// t.start();
-
+ int running = sset.countImplementors();
+ int instances = ev.getInstances();
+ int registered = sset.getNInstances();
+ int wanted = 0;
+
+ if ( instances == -1 ) {
+ wanted = Math.max(0, registered - running);
+ } else {
+ wanted = instances;
+ }
+ if ( wanted == 0 ) {
return new ServiceReplyEvent(true,
- "New instances[" + wanted + "]",
+ "Service " + serviceIdString + " instances[" + running + "], no additional instances started. ",
sset.getKey(),
sset.getId().getFriendly());
- } else {
- return new ServiceReplyEvent(false, "Not registered", sset.getKey(), sset.getId().getFriendly());
}
+
+ pendingRequests.add(new ApiHandler(ev, this));
+
+ return new ServiceReplyEvent(true,
+ "Service " + serviceIdString + " start request accepted, new instances[" + wanted + "]",
+ sset.getKey(),
+ sset.getId().getFriendly());
}
//
@@ -864,43 +690,36 @@ public class ServiceHandler
if ( update ) {
sset.setNInstances(running + instances);
+ sset.saveMetaProperties();
}
-
- sset.resetRunFailures(); // manual start overrides, if there's still a problem
- // the service will be stopped soon anyway.
- for ( int i = 0; i < wanted; i++ ) {
- if ( !sset.start() ) break;
- }
+ sset.setStarted(); // manual start overrides, if there's still a problem
+ sset.start(wanted);
}
ServiceReplyEvent stop(ServiceStopEvent ev)
{
- long friendly = ev.getFriendly();
- String epname = ev.getEndpoint();
- ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+ long id = ev.getFriendly();
+ String url = ev.getEndpoint();
+ String serviceIdString = extractId(id, url);
+ ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
- return new ServiceReplyEvent(false, "Unknown", epname, friendly);
+ return new ServiceReplyEvent(false, "Service " + serviceIdString + " does not exist.", null, id);
}
String userin = ev.getUser();
String userout = sset.getUser();
if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
- return new ServiceReplyEvent(false, "Owned by " + userout, epname, friendly);
+ return new ServiceReplyEvent(false, "Service " + serviceIdString + " Start declined: not owner.", serviceIdString, id);
}
- if ( sset.isRegistered() ) {
- if ( sset.isStopped() ) {
- return new ServiceReplyEvent(false, "Already stopped", sset.getKey(), sset.getId().getFriendly());
- }
-
- pendingRequests.add(new ApiHandler(ev, this));
- return new ServiceReplyEvent(true, "Stopping", sset.getKey(), sset.getId().getFriendly());
- } else {
- return new ServiceReplyEvent(false, "Not registered", sset.getKey(), sset.getId().getFriendly());
+ if ( sset.isStopped() ) {
+ return new ServiceReplyEvent(false, "Service " + serviceIdString + " is already stopped.", sset.getKey(), sset.getId().getFriendly());
}
+ pendingRequests.add(new ApiHandler(ev, this));
+ return new ServiceReplyEvent(true, "Service " + serviceIdString + " stop request accepted.", sset.getKey(), sset.getId().getFriendly());
}
//
@@ -910,11 +729,11 @@ public class ServiceHandler
// Otherwise we just stop the number asked for
// If --save is insicated we update the registry
//
- void doStop(long friendly, String epname, int instances, boolean update)
+ void doStop(long id, String url, int instances, boolean update)
{
//String methodName = "doStop";
- ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+ ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
int running = sset.countImplementors();
int tolose;
@@ -926,6 +745,7 @@ public class ServiceHandler
if ( update ) {
sset.setNInstances(Math.max(0, running - instances)); // never persist < 0 registered instance
+ sset.saveMetaProperties();
}
if ( tolose == running ) {
@@ -933,6 +753,7 @@ public class ServiceHandler
} else {
sset.stop(tolose); // selective stop
}
+
}
ServiceReplyEvent register(DuccId id, String props_filename, String meta_filename, DuccProperties props, DuccProperties meta)
@@ -942,96 +763,104 @@ public class ServiceHandler
String error = null;
boolean must_deregister = false;
+ String url = meta.getProperty("endpoint");
+ if ( serviceStateHandler.getServiceByUrl(url) != null ) {
+ return new ServiceReplyEvent(false, "Duplicate service: " + url + ". Registration fails", url, id.getFriendly());
+ }
+
ServiceSet sset = null;
try {
- sset = new ServiceSet(id, props_filename, meta_filename, props, meta);
+ sset = new ServiceSet(this, id, props_filename, meta_filename, props, meta);
} catch (Throwable t) {
+ // throws because endpoint is not parsable
error = t.getMessage();
- return new ServiceReplyEvent(false, t.getMessage(), "?", id.getFriendly());
+ return new ServiceReplyEvent(false, t.getMessage(), url, id.getFriendly());
}
- String key = sset.getKey();
-
- // Check if already registered
- ServiceSet sset0 = serviceStateHandler.getServiceByName(key);
- if ( sset0 != null ) {
- error = ("Duplicate owned by: " + sset0.getUser());
- } else {
- try {
- sset.saveServiceProperties();
- } catch ( Exception e ) {
- error = ("Internal error; unable to store service descriptor. " + key);
- logger.error(methodName, id, e);
- must_deregister = true;
- }
-
- try {
- if ( ! must_deregister ) {
- sset.saveMetaProperties();
- }
- } catch ( Exception e ) {
- error = ("Internal error; unable to store service meta-descriptor. " + key);
- logger.error(methodName, id, e);
- must_deregister = true;
- }
-
- // must check for cycles or we can deadlock
+ try {
+ sset.saveServiceProperties();
+ } catch ( Exception e ) {
+ error = ("Internal error; unable to store service descriptor. " + url);
+ logger.error(methodName, id, e);
+ must_deregister = true;
+ }
+
+ try {
if ( ! must_deregister ) {
- CycleChecker cc = new CycleChecker(sset);
- if ( cc.hasCycle() ) {
- error = ("Service dependencies contain a cycle with " + cc.getCycles());
- logger.error(methodName, id, error);
- must_deregister = true;
- }
+ sset.saveMetaProperties();
}
+ } catch ( Exception e ) {
+ error = ("Internal error; unable to store service meta-descriptor. " + url);
+ logger.error(methodName, id, e);
+ must_deregister = true;
+ }
+
+ // must check for cycles or we can deadlock
+ if ( ! must_deregister ) {
+ // TODO R2, revive the cycle checker
+ // CycleChecker cc = new CycleChecker(sset);
+ // if ( cc.hasCycle() ) {
+ // error = ("Service dependencies contain a cycle with " + cc.getCycles());
+ // logger.error(methodName, id, error);
+ // must_deregister = true;
+ // }
}
if ( error == null ) {
- serviceStateHandler.putServiceByName(sset.getKey(), sset);
- return new ServiceReplyEvent(true, "Registered", key, id.getFriendly());
+ serviceStateHandler.registerService(id.getFriendly(), url, sset);
+ return new ServiceReplyEvent(true, "Registered service.", url, id.getFriendly());
} else {
File mf = new File(meta_filename);
mf.delete();
File pf = new File(props_filename);
pf.delete();
- return new ServiceReplyEvent(false, error, key, id.getFriendly());
+ return new ServiceReplyEvent(false, error, url, id.getFriendly());
}
}
public ServiceReplyEvent modify(ServiceModifyEvent ev)
{
- long friendly = ev.getFriendly();
- String epname = ev.getEndpoint();
- ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+ long id = ev.getFriendly();
+ String url = ev.getEndpoint();
+ String serviceIdString = extractId(id, url);
+ ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
- return new ServiceReplyEvent(false, "Unknown", epname, friendly);
+ return new ServiceReplyEvent(false, "Unrecognized service ID[" + id + "] Endpoint[" + ((url == null) ? "N/A" : url) + "]", "?", id);
}
String userin = ev.getUser();
String userout = sset.getUser();
if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
- return new ServiceReplyEvent(false, "Owned by " + userout, epname, friendly);
+ return new ServiceReplyEvent(false, "Service " + serviceIdString + " Start declined: not owner.", serviceIdString, id);
}
- if ( sset.isRegistered() ) {
- pendingRequests.add(new ApiHandler(ev, this));
-// ApiHandler apih = new ApiHandler(ev, this);
-// Thread t = new Thread(apih);
-// t.start();
- return new ServiceReplyEvent(true, "Modifing", sset.getKey(), sset.getId().getFriendly());
- } else {
- return new ServiceReplyEvent(false, "Not registered", sset.getKey(), sset.getId().getFriendly());
- }
+ pendingRequests.add(new ApiHandler(ev, this));
+ return new ServiceReplyEvent(true, "Service " + serviceIdString + " modify request accepted.", sset.getKey(), sset.getId().getFriendly());
}
- void doModify(long friendly, String epname, int instances, Trinary autostart, boolean activate)
+ //void doModify(long id, String url, int instances, Trinary autostart, boolean activate)
+ void doModify(ServiceModifyEvent sme)
{
- //String methodName = "doStop";
+ String methodName = "doModify";
+
+ long id = sme.getFriendly();
+ String url = sme.getEndpoint();
+ ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
+
+ DuccProperties mods = sme.getProperties();
+
+ String pingArguments = mods.getStringProperty("service_ping_arguments", null);
+ String pingClass = mods.getStringProperty("service_ping_class" , null);
+ String pingClasspath = mods.getStringProperty("service_ping_classpath", null);
+ String pingJvmArgs = mods.getStringProperty("service_ping_jvm_args" , null);
+ String pingTimeout = mods.getStringProperty("service_ping_timeout" , null);
+ String pingDolog = mods.getStringProperty("service_ping_dolog" , null);
+
+ int instances = mods.getIntProperty("instances", -1);
+ boolean activate = mods.getBooleanProperty("activate", true);
- ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
-
if ( instances > 0 ) {
sset.setNInstances(instances); // also persists instances
if ( activate ) {
@@ -1039,81 +868,134 @@ public class ServiceHandler
int diff = instances - running;
if ( diff > 0 ) {
- while ( diff-- > 0 ) {
- if ( !sset.start() ) break;
- }
+ sset.start(diff);
} else if ( diff < 0 ) {
sset.stop(-diff);
}
}
}
- if ( autostart != Trinary.Unset ) {
- sset.setAutostart(autostart.decode());
- if ( activate ) {
- sset.enforceAutostart();
- }
+ if ( mods.containsKey("autostart" ) ) {
+ sset.setAutostart(mods.getBooleanProperty("autostart", true));
+ }
+
+ if ( pingArguments != null ) sset.setJobProperty("service_ping_arguments", pingArguments);
+ if ( pingClass != null ) sset.setJobProperty("service_ping_class" , pingClass);
+ if ( pingClasspath != null ) sset.setJobProperty("servcie_ping_classpath", pingClasspath);
+ if ( pingJvmArgs != null ) sset.setJobProperty("service_jvm_args" , pingJvmArgs);
+ if ( pingTimeout != null ) sset.setJobProperty("service_ping_timeout" , pingTimeout);
+ if ( pingDolog != null ) sset.setJobProperty("service_ping_dolog" , pingDolog);
+
+ try {
+ sset.saveServiceProperties();
+ sset.saveMetaProperties();
+ } catch ( Exception e ) {
+ logger.error(methodName, null, "Service", id, ": Internal error, unable to store service descriptor. " + url);
+ }
+
+ // Must restart pinger if anything about it changed.
+ if ( (pingClass != null) ||
+ (pingArguments != null) ||
+ (pingClasspath != null) ||
+ (pingJvmArgs != null) ||
+ (pingTimeout != null) ||
+ (pingDolog != null) ) {
+ sset.restartPinger(pingClass, pingArguments, pingClasspath, pingJvmArgs, pingTimeout, pingDolog);
}
}
public ServiceReplyEvent unregister(ServiceUnregisterEvent ev)
{
- long friendly = ev.getFriendly();
- String epname = ev.getEndpoint();
- ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+ String methodName = "unregister";
+ long id = ev.getFriendly();
+ String url = ev.getEndpoint();
+ String serviceIdString = extractId(id, url);
+ ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
- return new ServiceReplyEvent(false, "Unknown", epname, friendly);
+ return new ServiceReplyEvent(false, "Service " + serviceIdString + " does not exist.", serviceIdString, id);
}
+ id = sset.getId().getFriendly(); // must insure the ev has the numeric id because we work entirely with that from now ow
+ url = sset.getKey(); // also insure url is there for messages
+ ev.setEndpoint(url);
+ ev.setFriendly(id);
+
String userin = ev.getUser();
String userout = sset.getUser();
- if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
- return new ServiceReplyEvent(false, "Owned by " + userout, epname, friendly);
- }
+ logger.info(methodName, sset.getId(), "Unregister received from", userin);
- if ( sset.isRegistered() ) {
- sset.deregister(); // just sets a flag so we know how to handle it when it starts to die
- pendingRequests.add(new ApiHandler(ev, this));
-// ApiHandler apih = new ApiHandler(ev, this);
-// Thread t = new Thread(apih);
-// t.start();
- return new ServiceReplyEvent(true, "Shutting down implementors", sset.getKey(), sset.getId().getFriendly());
- } else {
- return new ServiceReplyEvent(false, "Not registered", sset.getKey(), sset.getId().getFriendly());
+ if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
+ return new ServiceReplyEvent(false, "Service " + serviceIdString + " Unregister declined: not owner.", serviceIdString, id);
}
+ serviceStateHandler.unregister(sset);
+ sset.deregister(); // just sets a flag so we know how to handle it when it starts to die
+ pendingRequests.add(new ApiHandler(ev, this));
+ return new ServiceReplyEvent(true, "Service " + serviceIdString + " unregistered. Shutting down implementors.", sset.getKey(), sset.getId().getFriendly());
}
//
// Everything to do this must be vetted before it is called. Run in a new thread to not hold up the API.
//
- void doUnregister(long friendly, String epname)
+ void doUnregister(long friendly, String url)
{
String methodName = "doUnregister";
- ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+ ServiceSet sset = serviceStateHandler.getUnregisteredService(friendly);
+ if ( sset == null ) {
+ logger.error(methodName, null, "Service", friendly, "(" + url + ") is not a known, unregistereed service. No action taken.");
+ return;
+ }
- if ( sset.countImplementors() > 0 ) {
- logger.debug(methodName, sset.getId(), "Stopping implementors:", friendly, epname);
- sset.stop();
- } else if ( sset.isPingOnly() ) {
- logger.debug(methodName, sset.getId(), "Unregister ping-only setvice:", friendly, epname);
+ if ( sset.isPingOnly() ) {
+ logger.info(methodName, sset.getId(), "Unregister ping-only setvice:", friendly, url);
sset.stop();
- serviceStateHandler.removeService(epname, friendly);
+ serviceStateHandler.removeService(sset);
+ sset.deleteProperties();
+ } else if ( sset.countImplementors() > 0 ) {
+ logger.debug(methodName, sset.getId(), "Stopping implementors:", friendly, url);
+ sset.stop(); // will call removeServices once everything is stopped
} else {
- logger.debug(methodName, sset.getId(), "Removing from map:", friendly, epname);
- serviceStateHandler.removeService(epname, friendly);
+ logger.debug(methodName, sset.getId(), "Removing from map:", friendly, url);
+ sset.clearQueue(); // will call removeServices if everything looks ok
}
- sset.deleteProperties();
+ }
+
+ /**
+ * This is used only to convert the user's provided ID into a string for messages.
+ */
+ String extractId(long friendly, String epname)
+ {
+ return ((epname == null) ? Long.toString(friendly) : epname);
+ }
+
+ void addInstance(ServiceSet sset, ServiceInstance inst)
+ {
+ serviceStateHandler.addImplementorFor(sset, inst);
+ }
+
+ void removeImplementor(ServiceSet sset, ServiceInstance inst)
+ {
+ serviceStateHandler.removeImplementorFor(sset, inst);
+ }
+
+ void removeService(ServiceSet sset)
+ {
+ String methodName = "deleteService";
+ if ( serviceStateHandler.hasService(sset.getId()) ) {
+ logger.error(methodName, sset.getId(), "Attempt to delete service while it is still registered: refused.");
+ } else {
+ serviceStateHandler.removeService(sset);
+ }
}
/**
* From: http://en.wikipedia.org/wiki/Topological_sorting
*
- * L �? Empty list that will contain the sorted elements
- * S �? Set of all nodes with no incoming edges
+ * L Empty list that will contain the sorted elements
+ * S Set of all nodes with no incoming edges
* while S is non-empty do
* remove a node n from S
* insert n into L
@@ -1126,309 +1008,315 @@ public class ServiceHandler
* else
* return L (a topologically sorted order)
*/
- class CycleChecker
- {
- ServiceSet sset;
- int edges = 0;
- List<String> cycles = null;
-
- CycleChecker(ServiceSet sset)
- {
- this.sset = sset;
- }
-
- boolean hasCycle()
- {
- // Start by building the dependency graph
- // TODO: Maybe consider saving this. Not clear there's much of a
- // gain doing the extra bookeeping beause the graphs will always
- // be small and will only need checking on registration or arrival
- // of a submitted service. So this cycle checking is always
- // fast anyway.
- //
- // Bookeeping could be a bit ugly because a submitted service could
- // bop in and change some dependency graph. We really only care
- // for checking cycles, so we'll check the cycles as things change
- // and then forget about it.
- //
- String[] deps = sset.getIndependentServices();
- if ( deps == null ) return false; // man, that was fast!
-
- Map<String, ServiceSet> visited = new HashMap<String, ServiceSet>(); // all the nodes in the graph
- clearEdges(sset, visited);
-
- List<ServiceSet> nodes = new ArrayList<ServiceSet>();
- nodes.addAll(visited.values());
- buildGraph(nodes);
+// class CycleChecker
+// {
+// ServiceSet sset;
+// int edges = 0;
+// List<String> cycles = null;
+
+// CycleChecker(ServiceSet sset)
+// {
+// this.sset = sset;
+// }
+
+// boolean hasCycle()
+// {
+// // Start by building the dependency graph
+// // TODO: Maybe consider saving this. Not clear there's much of a
+// // gain doing the extra bookeeping beause the graphs will always
+// // be small and will only need checking on registration or arrival
+// // of a submitted service. So this cycle checking is always
+// // fast anyway.
+// //
+// // Bookeeping could be a bit ugly because a submitted service could
+// // bop in and change some dependency graph. We really only care
+// // for checking cycles, so we'll check the cycles as things change
+// // and then forget about it.
+// //
+// String[] deps = sset.getIndependentServices();
+// if ( deps == null ) return false; // man, that was fast!
+
+// Map<String, ServiceSet> visited = new HashMap<String, ServiceSet>(); // all the nodes in the graph
+// clearEdges(sset, visited);
+
+// List<ServiceSet> nodes = new ArrayList<ServiceSet>();
+// nodes.addAll(visited.values());
+// buildGraph(nodes);
- List<ServiceSet> sorted = new ArrayList<ServiceSet>(); // topo-sorted list of nodes
- List<ServiceSet> current = new ArrayList<ServiceSet>(); // nodes with no incoming edges
+// List<ServiceSet> sorted = new ArrayList<ServiceSet>(); // topo-sorted list of nodes
+// List<ServiceSet> current = new ArrayList<ServiceSet>(); // nodes with no incoming edges
- // Constant: current has all nodes with no incoming edges
- for ( ServiceSet node : nodes ) {
- if ( ! node.hasPredecessor() ) current.add(node);
- }
-
- while ( current.size() > 0 ) {
- ServiceSet next = current.remove(0); // remove a node n from S
- sorted.add(next); // insert n int L
- List<ServiceSet> successors = next.getSuccessors();
- for ( ServiceSet succ : successors ) { // for each node m(pred) with an edge e from n to m do
- next.removeSuccessor(succ); // remove edge from graph
- succ.removePredecessor(next); // ...
- edges--;
- if ( !succ.hasPredecessor() ) current.add(succ); // if m(pred) has no incoming edges insert m into S
- }
- }
-
- if ( edges == 0 ) return false; // if graph has no edges, no cycles
-
- cycles = new ArrayList<String>(); // oops, and here they are
- for ( ServiceSet node : nodes ) {
- if ( node.hasSuccessor() ) {
- for ( ServiceSet succ : node.getSuccessors() ) {
- cycles.add(node.getKey() + " -> " + succ.getKey());
- }
- }
- }
- return true;
- }
-
- String getCycles()
- {
- return cycles.toString();
- }
+// // Constant: current has all nodes with no incoming edges
+// for ( ServiceSet node : nodes ) {
+// if ( ! node.hasPredecessor() ) current.add(node);
+// }
+
+// while ( current.size() > 0 ) {
+// ServiceSet next = current.remove(0); // remove a node n from S
+// sorted.add(next); // insert n int L
+// List<ServiceSet> successors = next.getSuccessors();
+// for ( ServiceSet succ : successors ) { // for each node m(pred) with an edge e from n to m do
+// next.removeSuccessor(succ); // remove edge from graph
+// succ.removePredecessor(next); // ...
+// edges--;
+// if ( !succ.hasPredecessor() ) current.add(succ); // if m(pred) has no incoming edges insert m into S
+// }
+// }
+
+// if ( edges == 0 ) return false; // if graph has no edges, no cycles
+
+// cycles = new ArrayList<String>(); // oops, and here they are
+// for ( ServiceSet node : nodes ) {
+// if ( node.hasSuccessor() ) {
+// for ( ServiceSet succ : node.getSuccessors() ) {
+// cycles.add(node.getKey() + " -> " + succ.getKey());
+// }
+// }
+// }
+// return true;
+// }
+
+// String getCycles()
+// {
+// return cycles.toString();
+// }
- //
- // Traveerse the graph and make sure all the nodes are "clean"
- //
- void clearEdges(ServiceSet node, Map<String, ServiceSet> visited)
- {
- String key = node.getKey();
- node.clearEdges();
- if ( visited.containsKey(key) ) return;
-
- visited.put(node.getKey(), node);
- String[] deps = node.getIndependentServices();
- if ( deps == null ) return;
+// //
+// // Traveerse the graph and make sure all the nodes are "clean"
+// //
+// void clearEdges(ServiceSet node, Map<String, ServiceSet> visited)
+// {
+// String key = node.getKey();
+// node.clearEdges();
+// if ( visited.containsKey(key) ) return;
+
+// visited.put(node.getKey(), node);
+// String[] deps = node.getIndependentServices();
+// if ( deps == null ) return;
- for ( String dep : deps ) {
- ServiceSet sset = serviceStateHandler.getServiceByName(dep);
- if ( sset != null ) {
- clearEdges(sset, visited);
- }
- }
- }
+// for ( String dep : deps ) {
+// ServiceSet sset = serviceStateHandler.getServiceByName(dep);
+// if ( sset != null ) {
+// clearEdges(sset, visited);
+// }
+// }
+// }
- void buildGraph(List<ServiceSet> nodes)
- {
- for ( ServiceSet node : nodes ) {
- String[] deps = node.getIndependentServices(); // never null if we get this far
- if ( deps != null ) {
- for ( String d : deps ) {
- ServiceSet outgoing = serviceStateHandler.getServiceByName(d);
- if ( outgoing == null ) continue;
- outgoing.setIncoming(node);
- node.setOutgoing(outgoing);
- edges++;
- }
- }
- }
- }
- }
-
- class ServiceStateHandler
- {
-
- // Map of active service descriptors by endpoint. For UIMA services, key is the endpoint.
- private Map<String, ServiceSet> servicesByName = new HashMap<String, ServiceSet>();
- private Map<Long, ServiceSet> servicesByFriendly = new HashMap<Long, ServiceSet>();
-
- // For each job, the collection of services it is dependent upon
- // DUccId is a Job Id (or id for serice that has dependencies)
- private Map<DuccId, Map<String, ServiceSet>> servicesByJob = new HashMap<DuccId, Map<String, ServiceSet>>();
-
- ServiceStateHandler()
- {
- }
-
- /**
- * Return a copy of the keys so we can fetch the services in an orderly manner.
- */
- synchronized ArrayList<String> getServiceNames()
- {
- ArrayList<String> answer = new ArrayList<String>();
- for ( String k : servicesByName.keySet() ) {
- answer.add(k);
- }
- return answer;
- }
-
- synchronized ServiceSet getServiceByName(String n)
- {
- return servicesByName.get(n);
- }
-
- synchronized ServiceSet getServiceByFriendly(long id)
- {
- return servicesByFriendly.get( id );
- }
-
- // API passes in a friendly (maybe) and an endpiont (maybe) but only one of these
- // Here we look up the service by whatever was passed in.
- synchronized ServiceSet getServiceForApi(long id, String n)
- {
- if ( n == null ) return getServiceByFriendly(id);
- return getServiceByName(n);
- }
+// void buildGraph(List<ServiceSet> nodes)
+// {
+// for ( ServiceSet node : nodes ) {
+// String[] deps = node.getIndependentServices(); // never null if we get this far
+// if ( deps != null ) {
+// for ( String d : deps ) {
+// ServiceSet outgoing = serviceStateHandler.getServiceByName(d);
+// if ( outgoing == null ) continue;
+// outgoing.setIncoming(node);
+// node.setOutgoing(outgoing);
+// edges++;
+// }
+// }
+// }
+// }
+// }
+
+ class ServiceStateHandler
+ {
+
+// // Map of active service descriptors by endpoint. For UIMA services, key is the endpoint.
+ private Map<String, ServiceSet> registeredServicesByUrl = new HashMap<String, ServiceSet>();
+ private Map<Long, ServiceSet> registeredServicesById = new HashMap<Long, ServiceSet>();
+ private Map<Long, ServiceSet> unregisteredServicesById = new HashMap<Long, ServiceSet>();
+ private Map<String, ServiceSet> unregisteredServicesByUrl = new HashMap<String, ServiceSet>();
+
+ private Map<Long, ServiceSet> servicesByImplementor = new HashMap<Long, ServiceSet>();
+
+ // private Map<Long, ServiceSet> servicesByFriendly = new HashMap<Long, ServiceSet>();
+
+// // For each job, the collection of services it is dependent upon
+// // DUccId is a Job Id (or id for serice that has dependencies)
+ private Map<DuccId, Map<Long, ServiceSet>> servicesByJob = new HashMap<DuccId, Map<Long, ServiceSet>>();
+
+// ServiceStateHandler()
+// {
+// }
+
+// /**
+// * Return a copy of the keys so we can fetch the services in an orderly manner.
+// */
+// synchronized ArrayList<String> getServiceNames()
+// {
+// ArrayList<String> answer = new ArrayList<String>();
+// for ( String k : servicesByName.keySet() ) {
+// answer.add(k);
+// }
+// return answer;
+// }
+
+ synchronized void unregister(ServiceSet sset)
+ {
+ String methodName = "ServiceStateHandler.unregister";
+ String key = sset.getKey();
+ long fid = sset.getId().getFriendly();
+ logger.info(methodName, sset.getId(), "Removing", key, fid);
+ registeredServicesByUrl.remove(key);
+ registeredServicesById.remove(fid);
+
+ unregisteredServicesById.put(fid, sset);
+ unregisteredServicesByUrl.put(key, sset);
+ }
+
+// synchronized ServiceSet getUnregisteredService(String url)
+// {
+// return unRegisteredServicesByUrl.get(url);
+// }
+
+ synchronized boolean hasService(DuccId id)
+ {
+ String methodName = "ServiceStateHandler.hasService";
+
+ logger.info(methodName, null, "containsKey", id, registeredServicesById.containsKey(id.getFriendly()));
+ return registeredServicesById.containsKey(id.getFriendly());
+ }
+
+ synchronized void registerService(Long id, String ep, ServiceSet sset)
+ {
+ String methodName = "ServiceStateHandler.registerService";
+
+ logger.info(methodName, sset.getId(), "adding", ep, id);
+
+ registeredServicesByUrl.put(ep, sset);
+ registeredServicesById.put(id, sset);
+ }
+
+ synchronized ServiceSet getServiceByUrl(String n)
+ {
+ return registeredServicesByUrl.get(n);
+ }
+
+ synchronized List<ServiceSet> getServices()
+ {
+ ArrayList<ServiceSet> answer = new ArrayList<ServiceSet>();
+ for ( ServiceSet sset : registeredServicesByUrl.values() ) {
+ answer.add(sset);
+ }
+ return answer;
+ }
+
+ synchronized void addImplementorFor(ServiceSet sset, ServiceInstance inst)
+ {
+ servicesByImplementor.put(inst.getId(), sset);
+ }
+
+ synchronized ServiceSet getServiceByImplementor(long id)
+ {
+ return servicesByImplementor.get(id);
+ }
+
+ synchronized void removeImplementorFor(ServiceSet sset, ServiceInstance inst)
+ {
+ servicesByImplementor.remove(inst.getId());
+ }
+
+// synchronized ServiceSet getServiceByFriendly(long id)
+// {
+// return servicesByFriendly.get( id );
+// }
+
+ // API passes in a friendly (maybe) and an endpiont (maybe) but only one of these
+ // Here we look up the service by whatever was passed in.
+ synchronized ServiceSet getServiceForApi(long id, String n)
+ {
+ if ( n == null ) return registeredServicesById.get(id);
+ return registeredServicesByUrl.get(n);
+ }
+
+ synchronized ServiceSet getUnregisteredService(long id)
+ {
+ return unregisteredServicesById.get(id);
+ }
+
+ synchronized ServiceSet getUnregisteredServiceByUrl(String url)
+ {
+ return unregisteredServicesByUrl.get(url);
+ }
+
+
+// synchronized void putServiceByName(String n, ServiceSet s)
+// {
+// servicesByName.put(n, s);
+// DuccId id = s.getId();
+// if ( id != null ) {
+// servicesByFriendly.put(id.getFriendly(), s);
+// }
+// }
+
+
+ synchronized void removeService(ServiceSet sset)
+ {
+ String key = sset.getKey();
+ long id = sset.getId().getFriendly();
+ unregisteredServicesById.remove(id);
+ unregisteredServicesByUrl.remove(key);
+
+ // The registeredServices need to have been removed during unregister which is the only way
+ // to get rid of a service.
+ Long[] implids = sset.getImplementors();
+ for ( Long l : implids ) {
+ servicesByImplementor.remove(l);
+ }
+
+ DuccId[] refids = sset.getReferences();
+ for ( DuccId rid : refids) {
+ servicesByJob.remove(rid);
+ }
+ }
+
+// synchronized void removeService(long id)
+// {
+// ServiceSet sset = servicesByFriendly.remove(id);
+// if ( sset != null ) {
+// String key = sset.getKey();
+// servicesByName.remove(key);
+// }
+// }
+
+// synchronized void removeService(String n, long id)
+// {
+// if ( n == null ) removeService(id);
+// else removeService(n);
+// }
+
+ synchronized Map<Long, ServiceSet> getServicesForJob(DuccId id)
+ {
+ return servicesByJob.get(id);
+ }
+
+ synchronized void putServiceForJob(DuccId id, ServiceSet s)
+ {
+ Map<Long, ServiceSet> services = servicesByJob.get(id);
+ if ( services == null ) {
+ services = new HashMap<Long, ServiceSet>();
+ servicesByJob.put(id, services);
+ }
+ services.put(s.getId().getFriendly(), s);
+ }
+
+ synchronized void removeServicesForJob(DuccId id)
+ {
+ servicesByJob.remove(id);
+ }
+
+// synchronized void recordNewServices(Map<String, ServiceSet> services)
+// {
+// servicesByName.putAll(services);
+// }
- synchronized List<ServiceSet> getRegisteredServices()
- {
- ArrayList<ServiceSet> answer = new ArrayList<ServiceSet>();
- for ( ServiceSet sset : servicesByName.values() ) {
- if ( sset.isRegistered() ) {
- answer.add(sset);
- }
- }
- return answer;
- }
+ }
- synchronized void putServiceByName(String n, ServiceSet s)
- {
- servicesByName.put(n, s);
- DuccId id = s.getId();
- if ( id != null ) {
- servicesByFriendly.put(id.getFriendly(), s);
- }
- }
-
- synchronized ServiceSet removeService(String n)
- {
- ServiceSet s = servicesByName.remove(n);
- if ( s != null ) {
- DuccId id = s.getId();
- if ( id != null ) {
- servicesByFriendly.remove(id.getFriendly());
- }
- }
- return s;
- }
-
- synchronized void removeService(long id)
- {
- ServiceSet sset = servicesByFriendly.remove(id);
- if ( sset != null ) {
- String key = sset.getKey();
- servicesByName.remove(key);
- }
- }
-
- synchronized void removeService(String n, long id)
- {
- if ( n == null ) removeService(id);
- else removeService(n);
- }
-
- synchronized Map<String, ServiceSet> getServicesForJob(DuccId id)
- {
-
- return servicesByJob.get(id);
- }
-
- synchronized void putServiceForJob(DuccId id, ServiceSet s)
- {
- Map<String, ServiceSet> services = servicesByJob.get(id);
- if ( services == null ) {
- services = new HashMap<String, ServiceSet>();
- servicesByJob.put(id, services);
- }
- services.put(s.getKey(), s);
- }
-
- synchronized void removeServicesForJob(DuccId id)
- {
- servicesByJob.remove(id);
- }
-
- synchronized void recordNewServices(Map<String, ServiceSet> services)
- {
- servicesByName.putAll(services);
- }
-
- }
-
- /**
- * Tester for topo sorter.
- * Input is props file, e.g. for the graph:
- * A -> B, A -> C, B -> C:
- *
- * services = A B C
- * svc.A = B C
- * svc.B = C
- * svc.C =
- *
- */
- private void runSortTester(String propsfile)
- {
- int friendly = 1;
- DuccProperties props = new DuccProperties();
- try {
- props.load(propsfile);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- System.exit(1);
- }
-
-
- String svcnames = props.getStringProperty("services");
- String[] svcs = svcnames.split("\\s");
- ServiceSet[] allServices = new ServiceSet[svcs.length];
- int ndx = 0;
- for ( String svc : svcs ) {
- svc = svc.trim();
- String key = "UIMA-AS:" + svc + ":tcp://foo:123";
- ServiceSet dep = serviceStateHandler.getServiceByName(key);
- if ( dep == null ) {
- dep = new ServiceSet(new DuccId(friendly++), new DuccId(0), key, null);
- serviceStateHandler.putServiceByName(key, dep);
- allServices[ndx++] = dep;
- }
-
- String depnames = props.getStringProperty("svc." + svc);
- String[] deps = depnames.split("\\s");
- List<String> subdeps = new ArrayList<String>();
- for ( String subsvc : deps ) {
- subsvc = subsvc.trim();
- if ( subsvc.equals("")) continue;
-
- String subkey = "UIMA-AS:" + subsvc + ":tcp://foo:123";
- ServiceSet subdep = serviceStateHandler.getServiceByName(subkey);
- if ( subdep == null ) {
- subdep = new ServiceSet(new DuccId(friendly++), new DuccId(0), subkey, null);
- serviceStateHandler.putServiceByName(subkey, subdep);
- allServices[ndx++] = subdep;
- }
- subdeps.add(subkey);
- }
- if ( subdeps.size() > 0 ) {
- dep.setIndependentServices(subdeps.toArray(new String[subdeps.size()]));
- }
- }
-
- CycleChecker cc = new CycleChecker(allServices[0]);
- if ( cc.hasCycle() ) {
- System.out.println("Service dependencies contain a cycle with " + cc.getCycles());
- } else {
- System.out.println("No cycles detected");
- }
-
- }
// tester for the topo sorter
public static void main(String[] args)
{
- ServiceHandler sh = new ServiceHandler(null);
- sh.runSortTester(args[0]);
- }
+
+ }
}