You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2014/02/03 20:35:49 UTC
svn commit: r1564022 [4/5] - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/
uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/
uima-ducc-examples/src/main/resources/org/apache/uima/ducc/test/servi...
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1564022&r1=1564021&r2=1564022&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Mon Feb 3 19:35:48 2014
@@ -18,18 +18,16 @@
*/
package org.apache.uima.ducc.sm;
-import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
@@ -46,6 +44,8 @@ import org.apache.uima.ducc.common.utils
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.id.ADuccId;
import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.sm.IServiceDescription;
import org.apache.uima.ducc.transport.event.sm.ServiceDescription;
@@ -56,27 +56,31 @@ import org.apache.uima.util.Level;
* Represents the collection of process, jobs, and such that implement a given service.
*/
+@SuppressWarnings("serial")
public class ServiceSet
implements SmConstants
{
/**
*
*/
- private static final long serialVersionUID = 1L;
private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);
-
+ private ServiceHandler handler;
+
// key is unique id of descriptor. The descriptor inherites key from a Job's DuccId, or from
// a unique-to SM key for implicit references.
- HashMap<DuccId, JobState> implementors = new HashMap<DuccId, JobState>();
+
+ Map<Long, ServiceInstance> implementors = new HashMap<Long, ServiceInstance>();
+ List<ServiceInstance> pendingStarts = new LinkedList<ServiceInstance>();
// key is job/service id, value is same. it's a map for fast existence check
- HashMap<DuccId, DuccId> references = new HashMap<DuccId, DuccId>();
+ Map<DuccId, DuccId> references = new HashMap<DuccId, DuccId>();
// For a registered service, here is my registered id
DuccId id;
HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>();
String history_key = "work-instances";
+ String implementors_key = "implementors";
// incoming nodes, for dup checking
List<ServiceSet> predecessors = new ArrayList<ServiceSet>();
@@ -96,24 +100,23 @@ public class ServiceSet
// Registered services, the submitter
String user;
- // Registered services, Automatically start at boot, and keep implementors alive
+ // Automatically start at boot, and keep implementors alive
boolean autostart = false;
- // Registered services, we've been stopped. Must remember in order to counteract autostart.
+ // We've been stopped, which is used to override autostart
boolean stopped = false;
- // Registered services, remember if was started by reference only so we can stop when refs die
- boolean referenced_start = false;
+ // We've been started, so we know to enforce instance count even if not autostarted
+ boolean started = false;
+ // Remember if was started by reference only so we can stop when refs die
+ boolean reference_start = false;
+ // is it ping-only?
+ boolean ping_only = false;
- // Registered services, the number of instances to maintain
+ // The number of instances to maintain live.
int instances = 1;
- // Service pinger
+ // Service monitor / pinger
IServiceMeta serviceMeta = null;
- // After stopping a pinger we need to discard it,and we usually need to stop it well before
- // it is ok to delete residual state such as UIMA-AS queues. Instead of discarding it, we
- // stash it here to use once the last implementor seems to be dead.
- IServiceMeta residualMeta = null;
-
// registered services state files
DuccProperties job_props = null;
DuccProperties meta_props = null;
@@ -123,136 +126,46 @@ public class ServiceSet
ServiceType service_type = ServiceType.Undefined;
ServiceClass service_class = ServiceClass.Undefined;
- ServiceState service_state = ServiceState.Undefined;;
+ ServiceState service_state = ServiceState.Stopped;;
// structures to manage service linger after it exits
Timer timer = null;
LingerTask linger = null;
- long linger_time = 5000;
-
- int failure_max = ServiceManagerComponent.failure_max;
- int failure_run = 0; // max allowed consecutive failures, current failure count
-
- //JobState job_state = JobState.Undefined;
- //
- // This is the constructor for an implicit service
- //
- public ServiceSet(String key, DuccId id)
- {
- this.key = key;
- this.id = id;
- parseEndpoint(key);
-
- if ( this.service_type == ServiceType.Custom ) {
- throw new IllegalStateException("Custom services may not be referenced as Implicit services.");
- }
-
- this.service_type = ServiceType.UimaAs;
- this.service_class = ServiceClass.Implicit;
-
- String state_dir = System.getProperty("DUCC_HOME") + "/state";
-
- // Need job props and meta props for webserver.
- // The pinger is always the default configured UIMA-AS pinger.
- //
- // job props: working_directory, log_directory
- // meta props: endpoint, user
- job_props = new DuccProperties();
- // job_props.put("working_directory", System.getProperty("user.dir")); // whatever my current dir is
- // job_props.put("log_directory", System.getProperty("user.dir") + "/../logs");
- //job_props.put("service_ping_jvm_args", "-Xmx50M");
- props_filename = state_dir + "/services/" + id.toString() + ".svc";
- saveServiceProperties();
-
- meta_props = new DuccProperties();
- meta_props.put("user", System.getProperty("user.name"));
- meta_props.put("endpoint", key);
- meta_props.put("service-class", ""+service_class.decode());
- meta_props.put("service-type", ""+service_type.decode());
- meta_props.put("stopped", ""+stopped);
- meta_props.put("service-state", ""+getServiceState());
- meta_props.put("ping-active", "false");
- meta_props.put("service-alive", "false");
- meta_props.put("service-healthy", "false");
- meta_props.put("service-statistics", "N/A");
- meta_props.put("ping-only", "true");
-
- meta_filename = state_dir + "/services/" + id.toString() + ".meta";
- saveMetaProperties();
- }
-
- //
- // Constructor for a submitted service
- //
- public ServiceSet(DuccId id, DuccId first_implementor, String key, String[] independentServices)
- {
- this.key = key;
- this.id = id;
- this.friendly_ids.put(first_implementor.getFriendly(), null);
-
- this.independentServices = independentServices;
- this.service_class = ServiceClass.Submitted;
+ long linger_time = 60000;
- parseEndpoint(key);
-
- String state_dir = System.getProperty("DUCC_HOME") + "/state";
- // Need job props and meta props for webserver.
- // The pinger is always the default configured UIMA-AS pinger.
- // Submitted services must always be UIMA-AS services, for now, checked in caller.
- //
- // job props: working_directory, log_directory
- // meta props: endpoint, user
- job_props = new DuccProperties();
- //job_props.put("service_ping_jvm_args", "-Xmx50M");
- props_filename = state_dir + "/services/" + id.toString() + ".svc";
- saveServiceProperties();
-
- meta_props = new DuccProperties();
- meta_props.put("user", System.getProperty("user.name"));
- meta_props.put("endpoint", key);
- meta_props.put("service-class", ""+service_class.decode());
- meta_props.put("service-type", ""+service_type.decode());
- meta_props.put("stopped", ""+stopped);
- meta_props.put("service-state", ""+getServiceState());
- meta_props.put("ping-active", "false");
- meta_props.put("service-alive", "false");
- meta_props.put("service-healthy", "false");
- meta_props.put("service-statistics", "N/A");
- meta_props.put("implementors", ""+id.getFriendly());
- meta_props.put("ping-only", "false");
+ int init_failures_max = ServiceManagerComponent.failure_max;
+ int init_failures = 0; // max allowed consecutive failures, current failure count
+
+ int ping_failures_max = ServiceManagerComponent.failure_max;
+ int ping_failures = 0; // for ping-only services, if the external pinger throws errors we
+ // need to govern it
- meta_filename = state_dir + "/services/" + id.toString() + ".meta";
- saveMetaProperties();
- }
+ int run_failures = 0;
+ boolean excessiveRunFailures = false; // signalled by monitor / pinger if we have too many
//
// Constructor for a registered service
//
- public ServiceSet(DuccId id, String props_filename, String meta_filename, DuccProperties props, DuccProperties meta)
+ public ServiceSet(ServiceHandler handler, DuccId id, String props_filename, String meta_filename, DuccProperties props, DuccProperties meta)
{
+ this.handler = handler;
this.job_props = props;
this.meta_props = meta;
this.id = id;
this.props_filename = props_filename;
this.meta_filename = meta_filename;
- this.service_state = ServiceState.NotAvailable;
- this.linger_time = props.getLongProperty(RegistrationOption.ServiceLinger.decode(), 5000);
+ this.service_state = ServiceState.Stopped;
+ this.linger_time = props.getLongProperty(RegistrationOption.ServiceLinger.decode(), linger_time);
this.key = meta.getProperty("endpoint");
- this.failure_max = props.getIntProperty(UiOption.InstanceFailuresLimit.pname(), ServiceManagerComponent.failure_max);
+ this.init_failures_max = props.getIntProperty(UiOption.InstanceFailuresLimit.pname(), ServiceManagerComponent.failure_max);
parseEndpoint(key);
this.user = meta.getProperty("user");
this.instances = meta.getIntProperty("instances", 1);
this.autostart = meta.getBooleanProperty("autostart", false);
-
- String idprop = meta.getProperty("implementors", null);
- if ( idprop != null ) {
- String[] ids = idprop.split("\\s");
- for ( String i : ids ) {
- friendly_ids.put(Long.parseLong(i), null);
- }
- }
+ this.ping_only = meta.getBooleanProperty("ping-only", false);
+ this.stopped = meta.getBooleanProperty("stopped", stopped);
this.service_class = ServiceClass.Registered;
parseIndependentServices();
@@ -264,19 +177,35 @@ public class ServiceSet
job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
}
+ meta_props.remove("references"); // Will get refreshred in upcoming OR state messages
meta_props.put("service-class", ""+service_class.decode());
meta_props.put("service-type", ""+service_type.decode());
meta_props.put("stopped", ""+stopped);
- meta_props.put("service-state", ""+getServiceState());
+ meta_props.put("service-state", ""+getState());
meta_props.put("ping-active", "false");
meta_props.put("service-alive", "false");
meta_props.put("service-healthy", "false");
meta_props.put("service-statistics", "N/A");
- if ( isStartable() ) {
- meta_props.put("ping-only", "false");
- } else {
+ if ( (!job_props.containsKey("process_executable")) && (service_type != ServiceType.UimaAs) ) {
meta_props.put("ping-only", "true");
+ this.ping_only = true;
+ } else {
+ meta_props.put("ping-only", "false");
+ this.ping_only = false;
+ // implementors.clear(); // will fill in later if this is hot restart
+ // String idprop = meta.getProperty("implementors", null);
+ // if ( idprop != null ) { // recover implementors on restart, possibly
+ // String[] ids = idprop.split("\\s");
+ // for ( String i : ids ) {
+ // long lid = Long.parseLong(i);
+ // ServiceInstance si = new ServiceInstance(this);
+ // implementors.put(lid, si);
+ // si.setId(lid);
+ // si.setUser(this.user);
+ // handler.addInstance(this, si);
+ // }
+ // }
}
// caller will save the meta props, **if** the rest of registration is ok.
@@ -331,13 +260,23 @@ public class ServiceSet
history = history + " " + id.toString();
}
meta_props.put(history_key, history);
- meta_props.remove("implementors");
+ meta_props.remove(implementors_key);
ServiceManagerComponent.deleteProperties(id.toString(), meta_filename, meta_props, props_filename, job_props);
meta_filename = null;
props_filename = null;
}
+ synchronized Long[] getImplementors()
+ {
+ return implementors.keySet().toArray(new Long[implementors.size()]);
+ }
+
+ synchronized DuccId[] getReferences()
+ {
+ return references.keySet().toArray(new DuccId[references.size()]);
+ }
+
void setIncoming(ServiceSet sset)
{
predecessors.add(sset);
@@ -349,11 +288,6 @@ public class ServiceSet
successors.clear();
}
- boolean isStopped()
- {
- return stopped;
- }
-
boolean hasPredecessor()
{
return predecessors.size() != 0;
@@ -400,6 +334,11 @@ public class ServiceSet
this.independentServices = ind;
}
+ void setJobProperty(String k, String v)
+ {
+ job_props.put(k, v);
+ }
+
private void parseIndependentServices()
{
String depstr = job_props.getProperty(RegistrationOption.ServiceDependency.decode());
@@ -415,96 +354,103 @@ public class ServiceSet
}
/**
- * A service is startable if it's UIMA-AS, or if it's CUSTOM and has a process_executable
- * associated with it. A non-startable CUSTOM service may have only a pinger.
- */
- boolean isStartable()
- {
- switch ( service_type ) {
- case UimaAs:
- return true;
- case Custom:
- return job_props.containsKey("process_executable");
- }
- return false; // redundant but needed for compilation
- }
-
- /**
* At boot only ... synchronize my state with published OR state.
+ *
+ * We do this in the first phase of boot, then bootComplete is called to synchronize
+ * history and update the physical meta properties file.
*/
- void synchronizeImplementors(Map<DuccId, JobState> work)
+ Map<Long, ServiceInstance> pendingImplementors = new HashMap<Long, ServiceInstance>();
+ void bootImplementor(DuccId id, JobState state)
{
- HashMap<Long, DuccId> newmap = new HashMap<Long, DuccId>();
- // first loop synchronized 'friendly_ids' with live jobs comining in
- for ( DuccId id : work.keySet() ) {
-
- long fid = id.getFriendly();
- if ( friendly_ids.containsKey(fid) ) {
- JobState js = work.get(id);
- implementors.put(id, js);
- newmap.put(fid, id);
- }
- }
+ ServiceInstance si = new ServiceInstance(this);
- // second loop synchronizes history with jobs that used to be live and aren't any more (because of restart)
- String history = meta_props.getStringProperty(history_key, "");
- for ( Long friendly : friendly_ids.keySet() ) {
- DuccId id = newmap.get(friendly);
- if ( id == null ) {
- history = history + " " + friendly;
- }
- }
- meta_props.put(history_key, history);
+ si.setState(state);
+ si.setId(id.getFriendly());
+ si.setStopped(false);
+ si.setUser(this.user);
- friendly_ids = newmap; // replace persisted version with validated version from OR state
- persistImplementors();
+ handler.addInstance(this, si);
+ pendingImplementors.put(id.getFriendly(), si);
}
/**
- *
+ * Second phase, update history, and physical metaprops.
*/
- void enforceAutostart()
+ void bootComplete()
{
- //String methodName = "enforceAutostart";
- if ( ! autostart ) return; // not doing auto, nothing to do
- if ( stopped ) return; // doing auto, but we've been manually stopped
- if ( failure_run >= failure_max ) return; // too many failures, no more enforcement
+ //
+ // During boot, inactive implementors are removed. Here we cull the implementors list to
+ // remove stuff that didn't come in.
+ //
- if ( (!isStartable()) && (serviceMeta == null) ) { // ping-only and pinger not alive
- start(); // ... then it needs to be started
- return;
+ if ( isPingOnly() && ! stopped) {
+ start(1); // nothing to recover but we need the pseudo service to run
+ return;
}
+ implementors = pendingImplementors; // only the ones that check in. others are toast
- // could have more implementors than instances if some were started dynamically but the count not persisted
- int needed = Math.max(0, instances - countImplementors());
+ //
+ // must update history against stuff we used to have and don't any more
+ //
+ String old_impls = meta_props.getProperty(implementors_key);
+ if ( old_impls != null ) {
+ Map<String, String> ip = new HashMap<String, String>();
+ String[] keys = old_impls.split("\\s+");
+ for ( String k : keys ) ip.put(k, k);
+
+ String history = meta_props.getProperty(history_key);
+ Map<String, String> hp = new HashMap<String, String>();
+ if ( history != null ) {
+ keys = history.split("\\s+");
+ for ( String k : keys ) hp.put(k, k);
+ }
+
+ // here, bop through the things we used to know about, and if
+ // it's missing from what checked in, it's history.
+ for ( String k : ip.keySet() ) {
+ Long iid = Long.parseLong(k);
+ if ( ! implementors.containsKey(iid) ) {
+ hp.put(k, k);
+ }
+ }
- while ( (needed--) > 0 ) {
- if ( ! start() ) break;
+ // now put the history string back into the meta props
+ if ( hp.size() > 0 ) {
+ StringBuffer sb = new StringBuffer();
+ for (String s : hp.keySet() ) {
+ sb.append(s);
+ sb.append(" ");
+ }
+ meta_props.setProperty(history_key, sb.toString().trim());
+ }
}
+ saveMetaProperties();
}
+
/**
- * Add implementor that we have an OR-assigned DuccId for
+ *
*/
- public void addImplementor(DuccId id, JobState js)
- {
- if ( isSubmitted() ) {
- friendly_ids.put(id.getFriendly(), id);
- }
- implementors.put(id, js);
- persistImplementors();
+ synchronized void enforceAutostart()
+ {
+ String methodName = "enforceAutostart";
+ if ( ! autostart ) return; // not doing auto, nothing to do
+ if ( stopped ) return; // doing auto, but we've been manually stopped
+ if ( init_failures >= init_failures_max ) return; // too many init failures, no more enforcement
+
+ // if ( (isPingOnly()) && (serviceMeta == null) ) { // ping-only and monitor / pinger not alive
+ // logger.info(methodName, id, "Autostarting 1 ping-only instance.");
+
+ // start(1); // ... then it needs to be started
+ // return;
+ // }
+
+ // could have more implementors than instances if some were started dynamically but the count not persisted
+ int needed = Math.max(0, instances - countImplementors());
+ logger.info(methodName, id, "Autostarting", needed, "instance" + ((needed > 1) ? "s" : ""), "already have", countImplementors());
+ start(needed);
}
- void promote()
- {
- String methodName = "promote";
- logger.debug(methodName, null, "Promoting", key);
- switch ( service_class ) {
- case Implicit : this.service_class = ServiceClass.Submitted; break;
- case Submitted: break;
- default : throw new IllegalStateException("Trying to promote a Registered service!");
- }
- }
boolean isUimaAs()
{
@@ -516,11 +462,6 @@ public class ServiceSet
return (service_type == ServiceType.Custom);
}
- Map<DuccId, JobState> getImplementors()
- {
- return implementors;
- }
-
DuccProperties getJobProperties()
{
return job_props;
@@ -531,50 +472,118 @@ public class ServiceSet
return meta_props;
}
- boolean isImplicit()
+ boolean isPingOnly()
{
- return (service_class == ServiceClass.Implicit);
+ return ping_only;
}
- boolean isSubmitted()
+ synchronized void setAutostart(boolean auto)
{
- return (service_class == ServiceClass.Submitted);
+ cancelLinger();
+ meta_props.setProperty("autostart", auto ? "true" : "false");
+ this.autostart = auto;
+ if ( auto ) {
+ // turning this on gives benefit of the doubt on failure management
+ // by definition, an autostarted services is NOT reference started
+ reference_start = false;
+ init_failures = 0;
+ run_failures = 0;
+ excessiveRunFailures = false;
+ }
}
- boolean isPingOnly()
+ synchronized void restartPinger(String pingClass, String pingArguments, String pingClasspath, String pingJvmArgs, String pingTimeout, String pingDolog)
+ {
+ String methodName = "restartPinger";
+ logger.info(methodName, id, "Modify pinger:", pingClass, pingArguments, pingClasspath, pingJvmArgs, pingTimeout, pingDolog);
+ stopPingThread();
+ // to Implemnt:
+ // Replace non-null properties in the registration progerties file, save the props, then stop and restart the pinger.
+ // May only have to stop the pinger, and state machine will restart it for us.
+ // TODO: Deal with pingOnly, if it turns out to be a special case.
+ }
+
+ /**
+ * Manual start: turn off manual stop
+ * override reference_start
+ * remember manual start was done
+ */
+ synchronized void setStarted()
{
- return meta_props.containsKey("ping-only");
+ stopped = false;
+ reference_start = false;
+ started = true;
+ init_failures = 0;
+ run_failures = 0;
+ excessiveRunFailures = false;
}
- boolean isRegistered()
+ /**
+ * Manual stop: override reference_start and manual start.
+ * remember 'stopped' so enforceAutostart doesn't restart
+ */
+ synchronized void setStopped()
{
- return (service_class == ServiceClass.Registered) && (!deregistered);
+ reference_start = false;
+ started = false;
+ stopped = true;
}
- void setReferencedStart(boolean val)
+ /**
+ * Start by reference: if autostarted or already manually started, don't change anything
+ * else remember we're ref started and not stopped
+ */
+ synchronized void setReferencedStart(boolean is_start)
{
- this.referenced_start = val;
+ if ( is_start ) {
+ if ( isAutostart() || isStarted() ) return;
+ this.stopped = false;
+ this.reference_start = true;
+ init_failures = 0;
+ run_failures = 0;
+ excessiveRunFailures = false;
+ } else {
+ this.reference_start = false;
+ }
}
- boolean isReferencedStart()
+ /**
+ * "Manually" stopped.
+ */
+ synchronized boolean isStopped()
{
- return this.referenced_start;
+ return this.stopped;
}
- String getUser()
+ /**
+ * "Manually" started.
+ */
+ synchronized boolean isStarted()
{
- return user;
+ return this.started;
}
/**
- * True only if
- * a) this is a registereed service, and
- * b) it has been deregistered.
- * because you can't deregister a non-registered service.
+ * Started "by reference"
*/
+ synchronized boolean isReferencedStart()
+ {
+ return this.reference_start;
+ }
+
+ synchronized boolean isAutostart()
+ {
+ return this.autostart;
+ }
+
+ String getUser()
+ {
+ return user;
+ }
+
boolean isDeregistered()
{
- return (service_class == ServiceClass.Registered) && (deregistered);
+ return deregistered;
}
void deregister()
@@ -601,15 +610,29 @@ public class ServiceSet
{
String methodName = "saveMetaProperties";
+ if ( isDeregistered() ) return;
+
if ( meta_filename == null ) {
// if this is null it was deleted and this is some kind of lingering thread updating, that
// we don't really want any more
- logger.warn(methodName, id, "Meta properties is deleted, bypassing attempt to save.");
+ logger.error(methodName, id, "Meta properties is deleted, bypassing attempt to save.");
return;
}
+ if ( implementors.size() == 0 ) {
+ meta_props.remove(implementors_key);
+ } else {
+ StringBuffer sb = new StringBuffer();
+ for ( Long l : implementors.keySet() ) {
+ sb.append(Long.toString(l));
+ sb.append(" ");
+ }
+ String s = sb.toString().trim();
+ meta_props.setProperty(implementors_key, s);
+ }
+
meta_props.put("stopped", ""+stopped);
- meta_props.put("service-state", ""+ getServiceState());
+ meta_props.put("service-state", ""+ getState());
meta_props.put("ping-active", "" + (serviceMeta != null));
meta_props.put("service-alive", "false");
meta_props.put("service-healthy", "false");
@@ -629,7 +652,9 @@ public class ServiceSet
fos = new FileOutputStream(meta_filename);
meta_props.store(fos, "Meta descriptor");
} catch (FileNotFoundException e) {
- logger.warn(methodName, id, "Cannot save meta properties, file does not exist.");
+ if ( !isDeregistered() ) {
+ logger.warn(methodName, id, "Cannot save meta properties, file does not exist.");
+ }
} catch (IOException e) {
logger.warn(methodName, id, "I/O Error saving meta properties:", e);
} finally {
@@ -666,42 +691,7 @@ public class ServiceSet
if ( n != meta_props.getIntProperty("instances") ) {
meta_props.setProperty("instances", Integer.toString(n));
this.instances = n;
- saveMetaProperties();
- }
- }
-
- synchronized void setAutostart(boolean auto)
- {
- meta_props.setProperty("autostart", auto ? "true" : "false");
- this.autostart = auto;
- saveMetaProperties();
- if ( auto ) {
- // turning this on gives benefit of the doubt on failure management
- failure_run = 0;
- }
- }
-
- synchronized boolean isAutostart()
- {
- return autostart;
- }
-
- synchronized void persistImplementors()
- {
- if ( isImplicit() ) return;
-
- if ( friendly_ids.size() == 0 ) {
- meta_props.remove("implementors");
- } else {
- StringBuffer sb = new StringBuffer();
- for ( Long l : friendly_ids.keySet() ) {
- sb.append(Long.toString(l));
- sb.append(" ");
- }
- String s = sb.toString().trim();
- meta_props.setProperty("implementors", s);
}
- saveMetaProperties();
}
synchronized void persistReferences()
@@ -720,251 +710,400 @@ public class ServiceSet
saveMetaProperties();
}
- /**
- * If we're are registered service, and one of the "stringids" from submit
- * happens to match the friendly id passed in then this ServiceSet is
- * the representative object for the service.
- */
- boolean matches(DuccId did)
+ void clearQueue()
{
- if ( ! isRegistered() ) return false;
+ String methodName = "clearQueue";
+
+ if ( !deregistered ) {
+ logger.info(methodName, id, "Not clearing queue because service is still registered.");
+ return;
+ }
- if ( friendly_ids.containsKey(did.getFriendly()) ) {
- friendly_ids.put(did.getFriendly(), did);
- return true;
+ if ( implementors.size() != 0 ) {
+ logger.info(methodName, id, "Not clearing queue because", implementors.size(), "implementors are still alive (", key, ").");
+ return;
}
- return false;
- }
+ handler.removeService(this);
+ deleteProperties();
- boolean containsImplementor(DuccId id)
- {
- // must use friendly, in case the thing was just started and not into the implementors set yet
- return friendly_ids.containsKey(id.getFriendly());
- }
+ if ( service_type != ServiceType.UimaAs ) {
+ logger.info(methodName, id, "Deleting unregistered service; not clearing queue because this is not a UIMA-AS service:", key);
+ return;
+ }
- public void removeImplementor(DuccId id)
- {
- String methodName = "removeImplementors";
- if ( ! implementors.containsKey(id ) ) return; // quick short circuit if it's already gone
+ if ( isPingOnly() ) {
+ logger.info(methodName, id, "Deleting unregistered service; not clearing queue for ping-only service", key);
+ return;
+ }
- logger.debug(methodName, this.id, "Removing implementor", id);
- implementors.remove(id);
- friendly_ids.remove(id.getFriendly());
- if ( implementors.size() == 0 ) {
- stopPingThread();
+ String pingclass = job_props.getStringProperty("service_ping_class", UimaAsPing.class.getName());
+ if ( !pingclass.equals(UimaAsPing.class.getName()) ) {
+ logger.info(methodName, id, "Deleting unregistered service: not clearing queue because not using the default UIMA-AS pinger:", pingclass, "(", key, ")");
+ return;
}
- String history = meta_props.getStringProperty(history_key, "");
- history = history + " " + id.toString();
- meta_props.put(history_key, history);
- persistImplementors();
- //
- // So much to check here
- // - all implementors gone?
- // - is it a registered, not-ping-only service, or a submitted service (startable)
- // - is it a uima-as service, with an internal pinger?
- // Only if all that is true, we'll clear out the queues.
- //==
- logger.debug(methodName, id, "implementors.size()", implementors.size(),
- "service_class", service_class,
- "isStartable()", isStartable(),
- "isSubmitted()", isSubmitted(),
- "service_type", service_type,
- "ping_class", job_props.getStringProperty("service_ping_class", UimaAsPing.class.getName())
- );
-
- // This block is to clear the service queue from ActiveMq if it's no longer being used.
- if ( implementors.size() == 0 ) { // Went to 0 and there was a pinger?
- if ( ( (service_class == ServiceClass.Registered) && isStartable()) || isSubmitted() ) { // Is one of our happy cases (not ping-only, we don't know much about it.)
- if ( service_type == ServiceType.UimaAs ) {
- // Either no pinger specified, in which case the default is used. Or, it is specified, and if it
- // matches the default, we get to clear anyway.
- String pingclass = job_props.getStringProperty("service_ping_class", UimaAsPing.class.getName());
- if ( pingclass.equals(UimaAsPing.class.getName()) ) {
- UimaAsServiceMonitor monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
- logger.debug(methodName, id, "Clearing queues");
- try {
- monitor.init(null);
- monitor.clearQueues();
- } catch (Throwable e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- }
+ // Only do this if using the default pinger. It's the pinger's job otherwise.
+ UimaAsServiceMonitor monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
+ logger.info(methodName, id, "Deleting unregistered service and clearing queues for", key, "at [" + broker_host + ":" + broker_jmx_port + "]");
+ try {
+ monitor.init(null);
+ monitor.clearQueues();
+ monitor.stop( );
+ } catch (Throwable e) {
+ // totally not a problem, just lost it
+ logger.info(methodName, id, e.toString());
}
}
public synchronized int countImplementors()
{
- // The implementos and friendly_ids sets track each other carefully. The former
- // tracks process state via the remote process's DuccId. The latter tracks
- // what we've tried to start, but may not know the actual state for until we get
- // Orchestrator publications.
- //
- // To avoid ugly races, we consider a process to be an implementor as soon as we
- // get the "friendly" id from the Orchestrator.
- return friendly_ids.size();
+ return implementors.size();
}
- public synchronized int reference(DuccId id)
+ public synchronized int countReferences()
{
- String methodName = "reference";
- logger.debug(methodName, this.id, " ---------------- Service ", this.id, "references", id);
+ return references.size();
+ }
+
+ public synchronized int getActiveInstances()
+ {
+ int t = 0;
+ for ( ServiceInstance inst : implementors.values() ) {
+ if ( inst.getState() == JobState.Running ) t++;
+ }
+ return t;
+ }
+ synchronized void cancelLinger()
+ {
+ String methodName = "cancelLinger";
if ( linger != null ) {
logger.debug(methodName, this.id, " ---------------- Canceling linger task");
linger.cancel();
linger = null;
}
+ }
+
+ public synchronized void reference(DuccId id)
+ {
+ String methodName = "reference";
+
+ cancelLinger();
references.put(id, id);
+ logger.info(methodName, this.id, " References job/service", id, "count[" + references.size() + "] implementors [" + implementors.size() + "]");
persistReferences();
- return references.size();
+
+ for (ServiceInstance si : implementors.values() ) { // see if anything is running
+ logger.debug(methodName, this.id, "Implementor", si.getId(), "state:", si.getState());
+ if ( si.isRunning() ) return; // and if so, no need to start anything
+ }
+
+ // Nothing running, so we do referenced start.
+ setReferencedStart(true);
+ start(instances);
+
}
- public synchronized int dereference(DuccId id)
+ public synchronized void dereference(DuccId id)
{
+ String methodName = "dereference";
if ( references.remove(id) == null ) {
- throw new IllegalStateException("Id " + id + " not found in map for " + getKey());
+ logger.error(methodName, this.id, "Dereference job/service", id, "not found in map for", getKey());
+ return;
}
// stop the pinger if no longer needed
- if ( (references.size() == 0) && // nothing left
- ( isImplicit() || ( isCustom() && !isStartable()) ) // implicit UIMA-AS || implicit CUSTOM
- )
- {
- stopPingThread(); // must stop pinger because there's no state machine
- // to do it for us in this situation
+ if ( (references.size() == 0) && isReferencedStart() ) { // nothing left
+ lingeringStop();
}
+ logger.info(methodName, this.id, " Dereferences job/service", id, "count[" + references.size() + "]");
persistReferences();
- return references.size();
}
- public synchronized int countReferences()
+// public synchronized int countReferences()
+// {
+// // note that this could change as soon as you get it so don't count on it being correct
+// // this is intended only for messages that don't have to be too accurate
+// return references.size();
+// }
+
+ boolean containsImplementor(DuccId id)
{
- // note that this could change as soon as you get it so don't count on it being correct
- // this is intended only for messages that don't have to be too accurate
- return references.size();
+ return implementors.containsKey(id.getFriendly());
}
+
+ synchronized void signalRebalance(int nadditions, int ndeletions, boolean isExcessiveFailures)
+ {
+ String methodName = "signalRebalance";
+ logger.info(methodName, id,
+ "PING: Additions:", nadditions,
+ "deletions:", ndeletions,
+ "excessive failures:", isExcessiveFailures,
+ "implementors", countImplementors(),
+ "references", countReferences()
+ );
+
+ while ( true ) {
+ // Kids, don't try this at home!
+ // All paths MUST lead to break or we loop forever - using while/break as goto mechanism
+ if ( isPingOnly() ) {
+ ping_failures = 0; // ping only signals, we know it's functioning
+ break;
+ }
+
+ this.excessiveRunFailures = isExcessiveFailures;
+ if ( nadditions == ndeletions ) break;
+
+ if ( nadditions > 0) {
+ start(nadditions);
+ }
+
+ if ( ndeletions > 0 ) {
- private ServiceState translateJobState(JobState js)
- {
- switch ( js ) {
- case Received: // Job has been vetted, persisted, and assigned unique Id
- case WaitingForDriver: // Process Manager is launching Job Driver
- case WaitingForServices: // Service Manager is checking/starting services for Job
- return ServiceState.Waiting;
- case WaitingForResources: // Scheduler is assigning resources to Job
- case Initializing: // Process Agents are initializing pipelines
- return ServiceState.Initializing;
- case Running: // At least one Process Agent has reported process initialization complete
- return ServiceState.Available;
- case Completing: // Job processing is completing
- case Completed: // Job processing is completed
- case Undefined: // None of the above
- default:
- return ServiceState.NotAvailable;
+ if ( (countReferences() > 0) && (countImplementors() <= ndeletions) ) {
+ logger.info(methodName, id, "Bypass deletions because of active references");
+ break;
+ }
+ stop(ndeletions);
+ }
+ break; // required break
}
}
/**
- * The MAX of the states of the implementors.
+ * Based on the state of the dwj, can we delete this instance from the records?
*/
- private ServiceState cumulativeJobState()
+ boolean canDeleteInstance(DuccWorkJob dwj)
{
- ServiceState response = ServiceState.NotAvailable;
- for ( JobState s : implementors.values() ) {
- ServiceState translated = translateJobState(s);
- if ( translated.ordinality() > response.ordinality() ) response = translated;
- }
-
- // if we're stopped, we cannot advance global state beyond whatever it is, i.e. an "active"
- // implementor that hasn't quit yet is not allowed to progress state.
- if ( isStopped() ) {
- ServiceState current = getServiceState();
- if ( current.ordinality() < response.ordinality() ) {
- response = current;
- }
+ // These are the job states
+ // Received, // Job has been vetted, persisted, and assigned unique Id
+ // WaitingForDriver, // Process Manager is launching Job Driver
+ // WaitingForServices, // Service Manager is checking/starting services for Job
+ // WaitingForResources, // Scheduler is assigning resources to Job
+ // Initializing, // Process Agents are initializing pipelines
+ // Running, // At least one Process Agent has reported process initialization complete
+ // Completing, // Job processing is completing
+ // Completed, // Job processing is completed
+ // Undefined // None of the above
+
+ switch ( dwj.getJobState() ) {
+ case Completing:
+ case Completed:
+ return true;
+ default:
+ return false;
}
- return response;
}
- // synchronized void establish()
-// {
-// if ( implementors.size() == 0 ) {
-// startPingThread();
-// }
-// // Otherwise we let the form that passes in implementor state start the pinger
-// }
-
- public synchronized void establish()
+ /**
+ * This is one of my service instances. Update its state and maybe kick the
+ * state machine as well.
+ TODO: proof this carefully
+ */
+ synchronized void signalUpdate(DuccWorkJob dwj)
{
- String methodName = "establish.0";
+ String methodName = "signalUpdate";
+ ServiceInstance inst = implementors.get(dwj.getDuccId().getFriendly());
+
+ if ( inst == null ) { // he's gone and we don't care any more
+ logger.warn(methodName, id, "Process", dwj.getDuccId(), "is no longer an implementor. Perhaps it exited earlier.");
+ return;
+ }
- logger.debug(methodName, id, "service_class", service_class, "nimplementors", implementors.size(),
- "instances", instances);
- switch ( service_class ) {
- case Implicit:
- startPingThread();
- break;
- case Submitted:
- for ( DuccId id : implementors.keySet() ) { // there's only one
- establish(id, implementors.get(id));
- }
- break;
- case Registered:
- // use friendly to find number of potential instances because it reflects both
- // the number of actually running instances plus those just started but not yet
- // checked with their ducc ids yet.
+ JobState old_state = inst.getState();
+ JobState state = dwj.getJobState();
+ DuccId inst_id = dwj.getDuccId();
+ long fid = inst_id.getFriendly();
+
+ if ( state == JobState.Running && old_state != JobState.Running ) {
+ // running, and wasn't before, we can reset the error counter
+ logger.info(methodName, id, "Resetting init error counter from", init_failures, "to 0 on transition from", old_state, "to", state);
+ init_failures = 0;
+ }
- if ( friendly_ids.size() > 0 ) {
- for ( DuccId id : implementors.keySet() ) { // there's only one
- establish(id, implementors.get(id));
- }
+ if ( canDeleteInstance(dwj) ) {
+ // State Completed or Completing
+ JobCompletionType jct = dwj.getCompletionType();
+ ServiceInstance stoppedInstance = null;
+
+ logger.info(methodName, this.id, "Removing implementor", fid, "(", key, ") completion", jct);
+ stoppedInstance = implementors.remove(fid); // won't fail, was checked for on entry
+
+ // TODO: put history into a better place
+ String history = meta_props.getStringProperty(history_key, "");
+ history = history + " " + fid;
+ meta_props.put(history_key, history);
+ saveMetaProperties();
+
+ logger.info(methodName, id, "Removing stopped instance", inst_id, "from maps: state[", state, "] completion[", jct, "] isStopped", isStopped());
+
+ clearQueue(); // this won't do anything if it looks like the service is still active somehow
+
+ if ( instances > countImplementors() ) { // have we fallen short of the nInstances we have to maintain?
+
+ // You can stop an instance with the ducc_services CLI, in which case this counts as a manual stop and not
+ // an error. Or the thing can go away for no clear reason, in which case it does as an error, even if somebody
+ // use the DuccServiceCancel API to stop it.
+ //
+ // TODO: Update the ducc_services CLI to allow stop and restart of specific instances without counting failure.
+ if ( stoppedInstance.isStopped() ) {
+ logger.info(methodName, id, "Instance", inst_id, "is manually stopped. Not restarting.");
} else {
- if ( service_type == ServiceType.Custom ) {
- startPingThread();
- } else {
- int needed = Math.max(0, instances - friendly_ids.size());
- while ( (needed--) > 0 ) {
- start();
+ // An instance stopped and we (SM) didn't ask it to - by definition this is failure no matter how it exits.
+
+ switch ( old_state ) {
+ case Initializing:
+ init_failures++;
+ logger.info(methodName, id, "Tally initialization failure:", init_failures);
+ break;
+ case Running:
+ run_failures++;
+ logger.info(methodName, id, "Tally runtime failure", run_failures);
+ break;
+ default:
+ // other states we blow off - we can enter this place a bunch of time a things wind down
+ break;
+ }
+
+ if ( excessiveFailures() ) {
+ if ( excessiveRunFailures ) {
+ logger.warn(methodName, id, "Instance", inst_id, "Monitor signals excessive terminations. Not restarting.");
+ } else {
+ logger.warn(methodName, id, "Instance", inst_id,
+ "Excessive initialization failures. Total failures[" + init_failures + "]",
+ "allowed [" + init_failures_max + "], not restarting.");
}
+ } else {
+ logger.warn(methodName, id, "Instance", inst_id + ": Uunsolicited termination, not yet excessive. Restarting instance.");
+ start(1);
+ return; // don't use termination to set state - start will signal the state machine
}
}
- break;
- }
+ }
+ }
+
+ inst.setState(state);
+ signal(inst);
+ }
+
+ private ServiceState translateJobState(JobState js)
+ {
+ switch ( js ) {
+ case Received: // Job has been vetted, persisted, and assigned unique Id
+ case WaitingForDriver: // Process Manager is launching Job Driver
+ case WaitingForServices: // Service Manager is checking/starting services for Job
+ case WaitingForResources: // Scheduler is assigning resources to Job
+ return ServiceState.Starting;
+ case Initializing: // Process Agents are initializing pipelines
+ return ServiceState.Initializing;
+ case Running: // At least one Process Agent has reported process initialization complete
+ return ServiceState.Available;
+ case Completing: // Job processing is completing
+ return ServiceState.Stopping;
+ case Completed: // Job processing is completed
+ return ServiceState.Stopped;
+ default:
+ return ServiceState.NotAvailable; // Should not ever get here. It's a noop if we do.
+ }
}
/**
- * Starts a ping thread because I've started up and I need to monitor myself.
+ * The MAX of the states of the implementors.
+ * case Available: return 8;
+ * case Waiting: return 7;
+ * case Initializing: return 6;
+ * case Starting: return 5;
+ * case Stopping: return 4;
+ * case Stopped: return 3;
+ * case NotAvailable: return 2;
+ * case Undefined: return 1;
+ * default: return 0;
*/
- public synchronized void establish(DuccId id, JobState job_state)
+ private ServiceState cumulativeJobState()
{
- String methodName = "establish.1";
+ String methodName = "cumulativeJobState";
+ ServiceState response = ServiceState.Stopped;
- if ( service_class == ServiceClass.Implicit ) {
- startPingThread();
- return;
+ for ( ServiceInstance si : implementors.values() ) {
+ JobState js = si.getState();
+ ServiceState translated = translateJobState(js);
+ if ( translated.ordinality() > response.ordinality() ) response = translated;
+ }
+
+ // If there's no pinger we don't adjust, so the state machine can do magic to start one.
+ // If there is a pinger, and it isn't pinging, we must not advance beyond the pinger's state.
+ if ( serviceMeta != null ) {
+ logger.trace(methodName, id, "Cumulative before checking monitor/pinger:", response, ". Monitor state:", serviceMeta.getServiceState());
+ if ( serviceMeta.getServiceState().ordinality() <= response.ordinality() ) response = serviceMeta.getServiceState();
}
- if ( service_class == ServiceClass.Submitted ) {
- //
- // Annoying edge case - cancel a service, then resubmit before the first one has time
- // to wind down - need to be sure that the winding-down service is not handled by the
- // new instance's state machine.
- //
- if ( ! implementors.containsKey(id) ) {
- logger.debug(methodName, id, "Submitted service: Skipping state machine because the service set has no implemetor for", id);
- return;
+ // It can take a while for instance state to catch up with stopping, so we override it here if needed
+ if ( isStopped() ) {
+ if ( ServiceState.Stopping.ordinality() < response.ordinality() ) {
+ logger.info(methodName, id, "Adjust state to", ServiceState.Stopping, "from", response, "because of service stop.");
+ response = ServiceState.Stopping;
}
}
+ return response;
+ }
+
+ synchronized ServiceState getState()
+ {
+ return service_state;
+ }
+
+ synchronized void setState(ServiceState new_state, ServiceState cumulative, ServiceInstance si)
+ {
+ String methodName = "setState";
+
+ String tail = "";
+ if ( si == null ) {
+ tail = "none/none";
+ } else {
+ tail = si.getId() + "/" + si.getState();
+ }
+
+ ServiceState prev = this.service_state;
+ this.service_state = new_state;
+ if ( prev != new_state ) {
+ logger.info(methodName, id, "State update from[" + prev + "] to[" + new_state + "] via[" + cumulative + "] Inst[" + tail + "]" );
+ }
+ saveMetaProperties();
+
+ // Execute actions that must always occur based on the new state
+ // These are all idempotent actions, call them as often as you want and no harm.
+ switch(new_state) {
+ case Available:
+ startPingThread();
+ break;
+ case Initializing:
+ break;
+ case Starting:
+ break;
+ case Waiting:
+ startPingThread();
+ break;
+ case Stopping:
+ stopPingThread();
+ break;
+ case Stopped:
+ stopPingThread();
+ break;
+ default:
+ stopPingThread();
+ break;
+ }
+ }
+
+ public synchronized void signal(ServiceInstance si)
+ {
+ String methodName = "signal";
+
if ( true ) {
- implementors.put(id, job_state);
ServiceState cumulative = cumulativeJobState();
//
// Note on the CUMULATIVE state: this is the cumulative state as determined by service processes. If they
@@ -978,141 +1117,224 @@ public class ServiceSet
// not yet pinging we need to see if any of the implementors states
// indicates we should be pinging, in which case, start the pinger.
//
- logger.debug(methodName, id, "serviceState", getServiceState(), "cumulativeState", cumulative);
- switch ( getServiceState() ) {
+ logger.trace(methodName, id, "serviceState", getState(), "cumulativeState", cumulative);
+ switch ( getState() ) {
// If I'm brand new and something is initting then I can be too. If something is
// actually running then I can start a pinger which will set my state.
- case Undefined:
+
+ case Available:
switch ( cumulative ) {
+ case Starting:
+ logger.warn(methodName, id, "STATE REGRESSION:", getState(), "->", cumulative); // can't do anything about it but complain
+ setState(ServiceState.Starting, cumulative, si);
+ break;
+
case Initializing:
- setServiceState(ServiceState.Initializing);
+ // Not immediately clear what would cause this other than an error but let's not crash.
+ logger.warn(methodName, id, "STATE REGRESSION:", getState(), "->", cumulative); // can't do anything about it but complain
+ setState(ServiceState.Initializing, cumulative, si);
break;
+
case Available:
- startPingThread();
+ setState(ServiceState.Available, cumulative, si);
break;
+
+ case Stopping:
+ setState(ServiceState.Stopping, cumulative, si);
+ break;
+
+ case Stopped:
+ setState(ServiceState.Stopped, cumulative, si);
+ break;
+
case Waiting:
- setServiceState(ServiceState.Waiting);
+ setState(ServiceState.Waiting, cumulative, si);
break;
- case NotAvailable:
- break;
+
+ default:
+ stopPingThread();
+ logger.warn(methodName, id, "ILLEGAL STATE TRANSITION:", getState(), "->", cumulative);
+ break;
+
}
+
break;
// If I'm initting and now something is running we can start a pinger
case Initializing:
switch ( cumulative ) {
- case Available:
- startPingThread();
+ case Starting:
+ logger.warn(methodName, id, "STATE REGRESSION:", getState(), "->", cumulative); // can't do anything about it but complain
+ setState(ServiceState.Starting, cumulative, si);
break;
+
case Initializing:
+ setState(ServiceState.Initializing, cumulative, si);
break;
- case Waiting:
- setServiceState(ServiceState.Initializing);
- break;
- case NotAvailable:
- if ( failure_run >= failure_max ) {
- setServiceState(ServiceState.NotAvailable);
- stopPingThread();
- } else {
- // don't regress if we're in retry
- logger.info(methodName, id, "RETRY RETRY RETRY prevents state regression from Initializing");
- }
- break;
- }
- break;
- case NotAvailable:
- switch ( cumulative ) {
case Available:
- startPingThread();
- setServiceState(ServiceState.Available);
+ setState(ServiceState.Waiting, cumulative, si);
break;
- case Initializing:
- setServiceState(ServiceState.Initializing);
+
+ case Stopping:
+ setState(ServiceState.Stopping, cumulative, si);
break;
+
+ case Stopped:
+ setState(ServiceState.Stopped, cumulative, si);
+ break;
+
case Waiting:
- setServiceState(ServiceState.Waiting);
+ setState(ServiceState.Initializing, cumulative, si);
+ logger.warn(methodName, id, "ILLEGAL STATE TRANSITION:", getState(), "->", cumulative);
break;
- case NotAvailable:
+
+ default:
+ logger.warn(methodName, id, "ILLEGAL STATE TRANSITION:", getState(), "->", cumulative);
break;
}
break;
- case Available:
- switch ( cumulative ) {
- case Available:
- startPingThread();
+ // If I'm initting and now something is running we can start a pinger
+ case Starting:
+ switch ( cumulative ) {
+ case Starting:
+ setState(ServiceState.Starting, cumulative, si);
break;
+
case Initializing:
- // Not immediately clear what would cause this other than an error but let's not crash.
- logger.warn(methodName, id, "STATE REGRESSION:", getServiceState(), "->", cumulative); // can't do anything about it but complain
- setServiceState(ServiceState.Initializing);
+ setState(ServiceState.Initializing, cumulative, si);
break;
- case NotAvailable:
- if ( failure_run >= failure_max ) {
- setServiceState(ServiceState.NotAvailable);
- stopPingThread();
- } else {
- // don't regress if we're in retry
- logger.info(methodName, id, "RETRY RETRY RETRY prevents state regression from Available");
- }
+
+ case Available:
+ setState(ServiceState.Waiting, cumulative, si);
+ break;
+
+ case Stopping:
+ logger.info(methodName, id, "RETRY RETRY RETRY prevents state regression from Initializing");
break;
- }
+ case Stopped:
+ setState(ServiceState.Stopped, cumulative, si);
+ break;
+
+ case Waiting:
+ logger.warn(methodName, id, "ILLEGAL STATE TRANSITION:", getState(), "->", cumulative);
+ break;
+
+ }
break;
case Waiting:
switch ( cumulative ) {
- case Available:
+ case Starting:
+ logger.warn(methodName, id, "STATE REGRESSION:", getState(), "->", cumulative); // can't do anything about it but complain
+ setState(ServiceState.Starting, cumulative, si);
break;
+
case Initializing:
- // state regression can happen with a promoted implicit service, where the service is referenced
- // and a pinger starts before the actual service is available. the ping keeps us in 'waiting' state
- // up until expiration, but if the service is actually initializing, we regress.
- logger.warn(methodName, id, "STATE REGRESSION:", getServiceState(), "->", cumulative); // can't do anything about it but complain
- setServiceState(ServiceState.Initializing);
+ logger.warn(methodName, id, "STATE REGRESSION:", getState(), "->", cumulative); // can't do anything about it but complain
+ setState(ServiceState.Initializing, cumulative, si);
+ break;
+
+ case Available:
+ setState(ServiceState.Available, cumulative, si);
break;
+
+ case Stopping:
+ setState(ServiceState.Stopping, cumulative, si);
+ break;
+
+ case Stopped:
+ setState(ServiceState.Stopped, cumulative, si);
+ break;
+
case Waiting:
+ setState(ServiceState.Waiting, cumulative, si);
break;
- case NotAvailable:
- if ( failure_run >= failure_max ) {
- setServiceState(ServiceState.NotAvailable);
- } else {
- // don't regress if we're in retry
- logger.info(methodName, id, "RETRY RETRY RETRY prevents state regression from Waiting");
- }
- stopPingThread();
+
+ default:
+ logger.warn(methodName, id, "ILLEGAL STATE TRANSITION:", getState(), "->", cumulative);
+
break;
+
}
break;
case Stopping:
switch ( cumulative ) {
- case Available:
+
+ case Starting:
+ setState(ServiceState.Starting, cumulative, si);
+ break;
+
case Initializing:
- case Waiting:
+ setState(ServiceState.Initializing, cumulative, si);
+ break;
+
+
+ case Available:
+ setState(ServiceState.Available, cumulative, si);
break;
- case NotAvailable:
- // all the implementors died finally
- setServiceState(ServiceState.NotAvailable);
+ case Stopped:
+ setState(ServiceState.Stopped, cumulative, si);
+ break;
+
+ case Stopping:
+ setState(ServiceState.Stopping, cumulative, si);
+ break;
+
+ default:
+ logger.warn(methodName, id, "ILLEGAL STATE TRANSITION:", getState(), "->", cumulative);
break;
}
break;
- }
- }
- }
+ case Stopped:
+ // OK
+ // Every transition can happen here because of hot-start of SM
+ switch ( cumulative ) {
+ case Starting:
+ setState(ServiceState.Starting, cumulative, si);
+ break;
- synchronized ServiceState getServiceState()
- {
- return service_state;
- }
+ case Initializing:
+ setState(ServiceState.Initializing, cumulative, si);
+ break;
+
+ case Available:
+ setState(ServiceState.Waiting, cumulative, si);
+ break;
+
+ case Waiting:
+ setState(ServiceState.Waiting, cumulative, si);
+ break;
+ case Stopped:
+ // Trailing OR publications cause this. Just record it for the log.
+ setState(ServiceState.Stopped, cumulative, si);
+ break;
+
+ case Stopping:
+ setState(ServiceState.Stopping, cumulative, si);
+ logger.warn(methodName, id, "UNEXPECTED STATE:", getState(), "->", cumulative);
+ break;
+
+ case NotAvailable:
+ // junk. just ignore it
+ logger.warn(methodName, id, "UNEXPECTED STATE:", getState(), "->", cumulative);
+ break;
+ }
+ break;
+
+ case NotAvailable:
+ case Undefined:
+ // OK
+ logger.warn(methodName, id, "Illiegal state", getState(), "Ignored.");
+ break;
+ }
+ }
- synchronized void setServiceState(ServiceState ss)
- {
- this.service_state = ss;
- saveMetaProperties();
}
synchronized String getKey()
@@ -1120,19 +1342,27 @@ public class ServiceSet
return key;
}
- void resetRunFailures()
+ synchronized int getRunFailures()
{
- failure_run = 0;
+ return run_failures;
}
- synchronized boolean excessiveRunFailures()
+ /**
+ * Analyze failures - either too many init failures, or the pinger says too many run failures.
+ */
+ synchronized boolean excessiveFailures()
{
- String methodName = "runFailures";
- if ( (++failure_run) >= failure_max ) {
- logger.debug(methodName, id, "RUN FAILURES EXCEEDED");
+ String methodName = "excessiveFailures";
+ if ( init_failures >= init_failures_max ) {
+ logger.trace(methodName, id, "INIT FAILURES EXCEEDED");
+ return true;
+ }
+
+ if ( excessiveRunFailures ) {
+ logger.trace(methodName, id, "EXCESSIVE RUN FAILURES SIGNALLED FROM SERVICE MONITOR.");
return true;
}
- logger.debug(methodName, id, "RUN FAILURES NOT EXCEEDED YET", failure_run);
+
return false;
}
@@ -1142,41 +1372,35 @@ public class ServiceSet
if ( serviceMeta != null ) return; // don't start multiple times.
try {
- logger.info(methodName, id, "Starting ping/monitor.");
+ logger.info(methodName, id, "Starting service monitor.");
serviceMeta = new PingDriver(this);
} catch ( Throwable t ) {
- logger.error(methodName, id, "Cannot instantiate ping/monitor.", t);
+ logger.error(methodName, id, "Cannot instantiate service monitor.", t);
return;
}
- setServiceState(ServiceState.Waiting);
+ //setState(ServiceState.Waiting);
Thread t = new Thread(serviceMeta);
t.start();
}
- synchronized void pingExited()
+ synchronized void pingExited(int rc, PingDriver which_meta)
{
String methodName = "pingExited";
- if ( serviceMeta != null ) {
- logger.warn(methodName, id, "Pinger exited voluntarily, setting state to Undefined. Endpoint", endpoint);
- setServiceState(ServiceState.Undefined); // not really sure what state is. it will be
-
- // checked and updated next run through the
- // main state machine, and maybe ping restarted.
- residualMeta = serviceMeta;
+ logger.info(methodName, id, "Service Monitor/Pinger exits, rc", rc);
+ if ( which_meta == serviceMeta ) {
serviceMeta = null;
- } else {
- if ( ! isStopped() ) { // state may still be Stopping, don't over regress state
- setServiceState(ServiceState.NotAvailable);
+ } // otherwise, it was already removed by some intrepid unit
+
+ if ( isPingOnly() && (rc != 0) ) {
+ if ( ++ping_failures > ping_failures_max ) {
+ logger.info(methodName, id, "Stopping pinger due to excessive falutes:", ping_failures_max);
+ // stop();
+ } else {
+ logger.info(methodName, id, "Ping-only pinger exits with error rc", rc, "total errors:", ping_failures);
}
}
-
- if ( isImplicit() ) {
- deleteProperties();
- } else {
- saveMetaProperties();
- }
}
public synchronized void stopPingThread()
@@ -1184,77 +1408,14 @@ public class ServiceSet
String methodName = "stopPingThread";
if ( serviceMeta != null ) {
- logger.debug(methodName, id, "Stopping ping thread, endpoint", endpoint);
+ logger.info(methodName, id, "Stopping monitor/ping thread for", key);
serviceMeta.stop();
- residualMeta = serviceMeta;
serviceMeta = null;
}
- if ( !isRegistered() ) {
- deleteProperties();
- } else {
- saveMetaProperties();
- }
- }
-
- synchronized void setResponsive()
- {
- setServiceState(ServiceState.Available);
saveMetaProperties();
}
- synchronized void setUnresponsive()
- {
- setServiceState(ServiceState.NotAvailable);
- saveMetaProperties();
- }
-
- synchronized void setWaiting()
- {
- //
- // Only switch state sometimes ....
- //
- switch ( getServiceState() ) {
- case Available:
- case NotAvailable:
- case Undefined:
- setServiceState(ServiceState.Waiting);
- break;
- case Initializing:
- case Waiting:
- case Stopping:
- break;
- }
- }
-
-// boolean ping()
-// {
-// //String methodName = "ping";
-// boolean answer = true;
-//
-// // Instantiate Uima AS Client
-// BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
-// Map<String, Object> appCtx = new HashMap<String, Object>();
-// appCtx.put(UimaAsynchronousEngine.ServerUri, broker);
-// appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
-// appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, ServiceManagerComponent.meta_ping_timeout); // 500 ms should be enough to get GetMeta reply
-//
-// try {
-// // this sends GetMeta request and blocks waiting for a reply
-// uimaAsEngine.initialize(appCtx);
-// // logger.info(methodName, null, "Dependent Service Available:", getKey());
-// } catch( ResourceInitializationException e) {
-// // either broker is down or service not available
-// // logger.error(methodName, null, "Remote service unavailable:", getKey());
-// answer = false;
-// } finally {
-// uimaAsEngine.stop();
-// }
-//
-// return answer;
-// }
-
-
void log_text(String logdir, String text)
{
String methodName = "log_text";
@@ -1273,14 +1434,12 @@ public class ServiceSet
try {
Process p = pb.start();
int rc = p.waitFor();
- logger.debug(methodName, null, "Log start errors returns with rc", rc);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ if ( rc != 0 ) {
+ logger.warn(methodName, id, "Attempt to update user's service.err.log returns with rc ", rc);
+ }
+ } catch (Throwable t) {
+ logger.warn(methodName, id, "Cannot update user's service.err.log:", t);
+ }
}
@@ -1324,287 +1483,88 @@ public class ServiceSet
}
- /**
- * This assumes the caller has already verified that I'm a registered service.
- */
- boolean start()
+ synchronized void start(int ninstances)
{
String methodName = "start";
- logger.debug(methodName, null, "START START START START START START START START");
- this.stopped = false;
-
- if ( ! isStartable() ) {
- establish(); // this will just start the ping thread
- return true;
- }
-
- // Simple use of ducc_ling, just submit as the user. The specification will have the working directory
- // and classpath needed for the service, handled by the Orchestrator and Job Driver.
- String[] args = {
- System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
- "-u",
- user,
- "--",
- System.getProperty("ducc.jvm"),
- "-cp",
- System.getProperty("java.class.path"),
- "org.apache.uima.ducc.cli.DuccServiceSubmit",
- "--specification",
- props_filename
- };
-
- for ( int i = 0; i < args.length; i++ ) {
- if ( i > 0 && (args[i-1].equals("-cp") ) ) {
- // The classpaths can be just awful filling the logs with junk. It will end up in the agent log
- // anyway so let's inhibit it here.
- logger.debug(methodName, null, "Args[", i, "]: <CLASSPATH>");
- } else {
- logger.debug(methodName, null, "Args[", i, "]:", args[i]);
- }
- }
-
- ProcessBuilder pb = new ProcessBuilder(args);
- Map<String, String> env = pb.environment();
- env.put("DUCC_HOME", System.getProperty("DUCC_HOME"));
-
- ArrayList<String> stdout_lines = new ArrayList<String>();
- ArrayList<String> stderr_lines = new ArrayList<String>();
- try {
- Process p = pb.start();
-
- int rc = p.waitFor();
- logger.debug(methodName, null, "DuccServiceSubmit returns with rc", rc);
-
- // TODO: we should attache these streams to readers in threads because too much output
- // can cause blocking, deadlock, ugliness.
- InputStream stdout = p.getInputStream();
- InputStream stderr = p.getErrorStream();
- BufferedReader stdout_reader = new BufferedReader(new InputStreamReader(stdout));
- BufferedReader stderr_reader = new BufferedReader(new InputStreamReader(stderr));
- String line = null;
- while ( (line = stdout_reader.readLine()) != null ) {
- stdout_lines.add(line);
- }
-
- line = null;
- while ( (line = stderr_reader.readLine()) != null ) {
- stderr_lines.add(line);
- }
-
- } catch (Throwable t) {
- // TODO Auto-generated catch block
- logger.error(methodName, null, t);
- }
-
- for ( String s : stderr_lines ) {
- logger.info(methodName, id, "Start stderr:", s);
- }
-
- // That was annoying. Now search the lines for some hint of the id.
- boolean inhibit_cp = false;
- boolean started = false;
- StringBuffer submit_buffer = new StringBuffer();
- boolean recording = false;
- for ( String s : stdout_lines ) {
-
- // simple logic to inhibit printing the danged classpath
- if ( inhibit_cp ) {
- inhibit_cp = false;
- logger.info(methodName, id, "<INHIBITED CP>");
- } else {
- logger.info(methodName, id, "Start stdout:", s);
- }
-
- if ( s.indexOf("-cp") >= 0 ) {
- inhibit_cp = true;
+ if ( isPingOnly() ) {
+ if ( implementors.containsKey(-1l) ) {
+ logger.info(methodName, id, "PING_ONLY: already started.");
+ return;
}
- if ( recording ) {
- submit_buffer.append(s.trim());
- submit_buffer.append(";");
- }
- if ( s.startsWith("1001 Command launching...") ) {
- recording = true;
- continue;
- }
+ ServiceInstance si = new PingOnlyServiceInstance(this);
+ si.setId(-1);
+ si.setUser(this.user);
+ implementors.put(-1l, si);
+ handler.addInstance(this, si);
+ si.start(null, null);
+ signal(si);
+ } else {
- if ( s.startsWith("Service") && s.endsWith("submitted") ) {
- String[] toks = s.split("\\s");
- long friendly = 0;
- try {
- friendly = Long.parseLong(toks[1]);
- friendly_ids.put(friendly, null);
- persistImplementors();
- started = true;
- logger.info(methodName, null, "Request to start service " + id.toString() + " accepted as job ", friendly);
- } catch ( NumberFormatException e ) {
- logger.warn(methodName, null, "Request to start service " + id.toString() + " failed, can't interpret response.: " + s);
+ for ( int i = 0; i < ninstances; i++ ) {
+ ServiceInstance si = new ServiceInstance(this);
+ long instid = -1;
+ logger.info(methodName, id, "Starting instance", i);
+ if ( (instid = si.start(props_filename, meta_props)) >= 0 ) {
+ logger.info(methodName, id, "Instance[", i, "] id ", instid);
+ implementors.put(instid, si);
+ handler.addInstance(this, si);
+ signal(si);
+ } else {
+ logger.info(methodName, id, "Instance[", i, "] id ", instid, "Failed to start.");
+ setAutostart(false);
+ signal(si);
+ break;
}
+ }
+ }
- }
- }
-
- boolean rc = true;
- if ( ! started ) {
- logger.warn(methodName, null, "Request to start service " + id.toString() + " failed.");
- meta_props.put("submit_error", submit_buffer.toString());
- setAutostart(false);
- log_errors(stdout_lines, stderr_lines);
- } else {
- meta_props.remove("submit_error");
- setServiceState(ServiceState.Initializing);
- }
saveMetaProperties();
- logger.debug(methodName, null, "ENDSTART ENDSTART ENDSTART ENDSTART ENDSTART ENDSTART");
- return rc;
}
/**
- * This assumes the caller has already verified that I'm a registered service.
+ * Stop 'count' services.
*/
- void stopOneProcess(DuccId id)
+ synchronized void stop(int count)
{
- String methodName = "stop";
-
- String[] args = {
- System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
- "-u",
- user,
- "--",
- System.getProperty("ducc.jvm"),
- "-cp",
- System.getProperty("java.class.path"),
- "org.apache.uima.ducc.cli.DuccServiceCancel",
- "--id",
- id.toString()
- };
-
- for ( int i = 0; i < args.length; i++ ) {
- if ( i > 0 && (args[i-1].equals("-cp") ) ) {
- // The classpaths can be just awful filling the logs with junk. It will end up in the agent log
- // anyway so let's inhibit it here.
- logger.debug(methodName, null, "Args[", i, "]: <CLASSPATH>");
- } else {
- logger.debug(methodName, null, "Args[", i, "]:", args[i]);
- }
- }
-
- ProcessBuilder pb = new ProcessBuilder(args);
- Map<String, String> env = pb.environment();
- env.put("DUCC_HOME", System.getProperty("DUCC_HOME"));
-
- ArrayList<String> stdout_lines = new ArrayList<String>();
- ArrayList<String> stderr_lines = new ArrayList<String>();
- try {
- Process p = pb.start();
-
- int rc = p.waitFor();
- logger.debug(methodName, null, "DuccServiceCancel returns with rc", rc);
+ String methodName = "stop(count)";
- InputStream stdout = p.getInputStream();
- InputStream stderr = p.getErrorStream();
- BufferedReader stdout_reader = new BufferedReader(new InputStreamReader(stdout));
- BufferedReader stderr_reader = new BufferedReader(new InputStreamReader(stderr));
-
- String line = null;
- while ( (line = stdout_reader.readLine()) != null ) {
- stdout_lines.add(line);
- }
-
- line = null;
- while ( (line = stderr_reader.readLine()) != null ) {
- stderr_lines.add(line);
- }
-
- } catch (Throwable t) {
- // TODO Auto-generated catch block
- logger.error(methodName, null, t);
+ logger.info(methodName, id, "Stopping", count, "implementors");
+
+ if ( count >= implementors.size() ) {
+ stopPingThread();
+ setStopped();
}
- boolean inhibit_cp = false;
- for ( String s : stdout_lines ) {
- // simple logic to inhibit printing the danged classpath
- if ( inhibit_cp ) {
- inhibit_cp = false;
- logger.info(methodName, id, "<INHIBITED CP>");
+ for ( ServiceInstance si: implementors.values() ) {
+ if ( (count--) > 0 ) {
+ si.stop();
+ signal(si);
} else {
- logger.info(methodName, id, "Stop stdout:", s);
- }
-
- if ( s.indexOf("-cp") >= 0 ) {
- inhibit_cp = true;
+ break;
}
}
-
- for ( String s : stderr_lines ) {
- logger.info(methodName, id, "Stop stderr:", s);
- }
-
- // is this the last implementor? if so the service is no longer available.
- // should not have to do this, the state should update correctly in the state machine
- //if ( implementors.size() <= 1 ) {
- // service_state = ServiceState.NotAvailable;
- //}
- //return new ServiceReplyEvent(ServiceCode.OK, "Start service " + id.toString() + " complete.", toks[1], null);
-
}
/**
* Stop everything
*/
- void stop()
+ synchronized void stop()
{
- String methodName = "stop";
- logger.debug(methodName, id, "Stopping all implementors");
- this.stopped = true;
- stopPingThread();
- setServiceState(ServiceState.Stopping);
-
- if ( ! isStartable() ) { // CUSTOM ping-only service
- return;
- }
-
- for ( DuccId id : implementors.keySet() ) {
- stopOneProcess(id);
- }
- saveMetaProperties();
+ stop(implementors.size());
}
- /**
- * Stop 'count' services.
- * TODO: Put in logic to stop intelligently, i.e. favor processes not yet running
- */
- void stop(int count)
- {
- String methodName = "stop(count)";
-
- if ( ! isStartable() ) { // CUSTOM ping-only, only one running, let common code do the honors
- stop();
- return;
- }
-
- logger.debug(methodName, id, "Stopping", count, "implementors");
- for ( DuccId id: implementors.keySet() ) {
- if ( (count--) > 0 ) {
- stopOneProcess(id);
- } else {
- break;
- }
- }
- saveMetaProperties();
- }
-
-
private class LingerTask
extends TimerTask
{
- ServiceSet sset;
- LingerTask(ServiceSet sset)
+ //ServiceSet sset;
+ //LingerTask(ServiceSet sset)
+ LingerTask()
{
String methodName = "LingerTask.init";
logger.debug(methodName, id, "Linger starts", linger_time);
- this.sset = sset;
+ //this.sset = sset;
}
public void run()
@@ -1612,9 +1572,8 @@ public class ServiceSet
String methodName = "LingerTask.run";
logger.debug(methodName, id, "Lingering stop completes.");
// doesn't matter how its started i think, we have to set this flag off when we stop
- sset.setReferencedStart(false);
linger = null;
- sset.stop();
+ stop();
}
}
@@ -1623,7 +1582,8 @@ public class ServiceSet
if ( timer == null ) {
timer = new Timer();
}
- linger = new LingerTask(this);
+ //linger = new LingerTask(this);
[... 70 lines stripped ...]