You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:45:29 UTC

svn commit: r1427972 [1/3] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/sm/ main/java/org/apache/uim...

Author: cwiklik
Date: Wed Jan  2 19:45:28 2013
New Revision: 1427972

URL: http://svn.apache.org/viewvc?rev=1427972&view=rev
Log:
UIMA-2491

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ApiHandler.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/CustomServiceMeta.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceManager.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceEndpointReader.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSpecifier.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/SmConstants.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaServiceMeta.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/config/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/config/ServiceManagerConfiguration.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/event/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/event/ServiceManagerEventListener.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/resources/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/test/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/test/java/
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/test/resources/

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ApiHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ApiHandler.java?rev=1427972&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ApiHandler.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ApiHandler.java Wed Jan  2 19:45:28 2013
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.sm;
+
+import org.apache.uima.ducc.transport.event.ServiceModifyEvent;
+import org.apache.uima.ducc.transport.event.ServiceStartEvent;
+import org.apache.uima.ducc.transport.event.ServiceStopEvent;
+import org.apache.uima.ducc.transport.event.ServiceUnregisterEvent;
+
+
+/**
+ * This class runs API commands in a thread, allowing the API to return quickly while the
+ * work proceeds in the background.
+ *
+ * It's just a threaded front-end to the API implementations in ServiceHandler.
+ */
+class ApiHandler
+    implements SmConstants,
+               Runnable
+{
+	ServiceVerb cmd;
+    ServiceHandler serviceHandler;
+
+    long friendly;
+    String endpoint;
+    int instances;
+    Trinary autostart;
+    boolean update;
+    boolean activate;
+
+    ApiHandler(ServiceUnregisterEvent event, ServiceHandler serviceHandler)
+    {
+        this.cmd = ServiceVerb.Unregister;
+        this.friendly = event.getFriendly();
+        this.endpoint = event.getEndpoint();
+        this.serviceHandler = serviceHandler;
+    }
+
+    ApiHandler(ServiceStartEvent event, ServiceHandler serviceHandler)
+    {
+        this.cmd = ServiceVerb.Start;
+        this.friendly = event.getFriendly();
+        this.endpoint = event.getEndpoint();
+        this.instances = event.getInstances();
+        this.update = event.getUpdate();
+        this.serviceHandler = serviceHandler;
+    }
+
+    ApiHandler(ServiceStopEvent event, ServiceHandler serviceHandler)
+    {
+        this.cmd = ServiceVerb.Stop;
+        this.friendly = event.getFriendly();
+        this.endpoint = event.getEndpoint();
+        this.instances = event.getInstances();
+        this.update = event.getUpdate();
+        this.serviceHandler = serviceHandler;
+    }
+
+    ApiHandler(ServiceModifyEvent event, ServiceHandler serviceHandler)
+    {
+        this.cmd = ServiceVerb.Modify;
+        this.friendly = event.getFriendly();
+        this.endpoint = event.getEndpoint();
+        this.instances = event.getInstances();
+        this.autostart = event.getAutostart();
+        this.activate = event.getActivate();
+        this.serviceHandler = serviceHandler;
+    }
+
+    public void run()
+    {
+        switch ( cmd ) {
+           case Start: 
+               serviceHandler.doStart(friendly, endpoint, instances, update);
+               break;
+
+           case Stop:
+               serviceHandler.doStop(friendly, endpoint, instances, update);
+               break;
+
+           case Unregister:
+               serviceHandler.doUnregister(friendly, endpoint);
+               break;
+
+           case Modify:
+               serviceHandler.doModify(friendly, endpoint, instances, autostart, activate);
+               break;
+        }
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ApiHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/CustomServiceMeta.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/CustomServiceMeta.java?rev=1427972&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/CustomServiceMeta.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/CustomServiceMeta.java Wed Jan  2 19:45:28 2013
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.sm;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+
+import org.apache.uima.ducc.common.ServiceStatistics;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccProperties;
+
+
+/**
+ * This runs the watchdog thread for custom service pingers.
+ *
+ * It spawns a process, as the user, which in turn will instantiate an object which extends
+ * AServiceMeta to implement the pinger.
+ *
+ * The processes communicate via a pipe: every ping interval the meta puts relevent information onto its
+ * stdout:
+ *     0|1 long long
+ * The first token is 1 if the ping succeeded, 0 otherwise.
+ * The second token is the total cumulative work executed by the service.
+ * The third token is the current queue depth of the service.       
+ */
+
+class CustomServiceMeta
+    implements IServiceMeta,
+               SmConstants
+{
+    private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);	
+
+    String[] jvm_args;
+    String endpoint;
+    String ping_class;
+    String classpath;
+    boolean ping_ok;
+    int missed_pings = 0;
+    ServiceSet sset;
+    boolean test_mode = false;
+
+    Process ping_main;
+
+    StdioListener sin_listener = null;
+    StdioListener ser_listener = null;
+    PingThread pinger = null;
+
+    int meta_ping_rate;
+    int meta_ping_stability;
+    ServiceStatistics service_statistics = null;
+
+    String user;
+    String working_directory;
+    String log_directory;
+
+    CustomServiceMeta(ServiceSet sset)
+    {        
+        this.sset = sset;
+        DuccProperties job_props = sset.getJobProperties();
+        DuccProperties meta_props = sset.getMetaProperties();
+
+        String jvm_args_str = job_props.getStringProperty("service_custom_jvm_args");
+        this.endpoint       = job_props.getStringProperty("service_custom_endpoint");
+        this.ping_class     = job_props.getStringProperty("service_custom_ping");
+        this.classpath      = job_props.getStringProperty("service_custom_classpath");
+        this.user           = meta_props.getStringProperty("user");
+        this.working_directory = job_props.getStringProperty("working_directory");
+        this.log_directory     = job_props.getStringProperty("log_directory");
+        
+        jvm_args = jvm_args_str.split(" ");
+
+        this.meta_ping_rate = ServiceManagerComponent.meta_ping_rate;
+        this.meta_ping_stability = ServiceManagerComponent.meta_ping_stability;
+
+    }
+
+    /**
+     * Test from main only
+     */
+    CustomServiceMeta(String props)
+    {        
+        DuccProperties dp = new DuccProperties();
+        try {
+			dp.load(props);
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+        String jvm_args_str = dp.getStringProperty("service_custom_jvm_args");
+        this.endpoint = dp.getStringProperty("service_custom_endpoint");
+        this.ping_class = dp.getStringProperty("service_custom_ping");
+        this.classpath = dp.getStringProperty("service_custom_classpath");
+        jvm_args = jvm_args_str.split(" ");
+        this.test_mode = true;
+    }
+
+    public ServiceStatistics getServiceStatistics()
+    {
+        return service_statistics;
+    }
+
+    public void run() 
+    {
+        String methodName = "run";
+        try {
+            pinger =  new PingThread();
+        } catch ( Throwable t ) {
+            logger.error(methodName, sset.getId(), "Cannot start listen socket, pinger not started.", t);
+            sset.setUnresponsive();
+            return;
+        }
+        int port = pinger.getPort();
+
+        Thread ping_thread = new Thread(pinger);
+        ping_thread.start();                            // sets up the listener, before we start the the external process
+
+        ArrayList<String> arglist = new ArrayList<String>();
+        if ( ! test_mode ) {
+            arglist.add(System.getProperty("ducc.agent.launcher.ducc_spawn_path"));
+            arglist.add("-u");
+            arglist.add(user);
+            arglist.add("-w");
+            arglist.add(working_directory);
+            arglist.add("-f");
+            arglist.add(log_directory + "/" + sset.getId() + "/" + sset.getId() + "-CUSTOM_PING");
+            arglist.add("--");
+        }
+
+        arglist.add(System.getProperty("ducc.jvm"));
+        arglist.add("-cp");
+        arglist.add(System.getProperty("java.class.path") + ":" + classpath);
+        for ( String s : jvm_args) {
+            arglist.add(s);
+        }
+        arglist.add("org.apache.uima.ducc.sm.sm.ServicePingMain");
+        arglist.add("--class");
+        arglist.add(ping_class);
+        arglist.add("--endpoint");
+        arglist.add(endpoint);
+        arglist.add("--port");
+        arglist.add(Integer.toString(port));
+
+        int i = 0;
+        for ( String s : arglist) {
+            logger.debug(methodName, sset.getId(), "Args[", i++,"]:  ", s);
+        }
+
+        ProcessBuilder pb = new ProcessBuilder(arglist);
+        
+        //
+        // Establish our pinger
+        //
+        InputStream stdout = null;
+        InputStream stderr = null;
+        try {
+            ping_main = pb.start();
+            stdout = ping_main.getInputStream();
+            stderr = ping_main.getErrorStream();
+            
+            sin_listener = new StdioListener(1, stdout);
+            ser_listener = new StdioListener(2, stderr);
+            Thread sol = new Thread(sin_listener);
+            Thread sel = new Thread(ser_listener);
+            sol.start();
+            sel.start();
+        } catch (Throwable t) {
+            logger.error(methodName, sset.getId(), "Cannot establish custom ping process:", t);
+            sset.setUnresponsive();
+            return;
+        }
+        
+        int rc;
+        while ( true ) {
+            try {
+                rc = ping_main.waitFor();
+                logger.debug(methodName, sset.getId(), "Pinger returns rc ", rc);
+                break;
+            } catch (InterruptedException e2) {
+                // nothing
+            }
+        }
+		
+		pinger.stop();
+        sin_listener.stop();
+        ser_listener.stop();
+    }
+
+    public void stop()
+    {
+        pinger.stop();
+        sin_listener.stop();
+        ser_listener.stop();
+        ping_main.destroy();
+    }
+
+    class PingThread
+        implements Runnable
+    {
+        ServerSocket server;
+        int port = -1;
+        boolean done = false;
+        int errors =0;
+        int error_threshold = 5;
+
+        PingThread()
+            throws IOException
+        {
+            this.server = new ServerSocket(0);
+            this.port = server.getLocalPort();
+		}
+
+        int getPort()
+        {
+            return this.port;
+        }
+
+        synchronized void stop()
+        {
+            this.done = true;
+        }
+
+        public void run()
+        {
+        	String methodName = "PingThread.run()";
+            try {
+
+                Socket sock = server.accept();
+                // Socket sock = new Socket("localhost", port);
+                sock.setSoTimeout(5000);
+                OutputStream out = sock.getOutputStream();
+                InputStream  in =  sock.getInputStream();
+                ObjectInputStream ois = new ObjectInputStream(in);
+                
+                ping_ok = false;         // we expect the callback to change this
+				while ( true ) {
+                    synchronized(this) {
+                        if ( done ) return;
+                    }
+
+                    if ( errors > error_threshold ) {
+                        stop();
+                    }
+
+                    // Ask for the ping
+                    try {
+                        logger.trace(methodName, sset.getId(), "PingDriver: ping OUT");
+                        out.write('P');
+                        out.flush();
+                    } catch (IOException e1) {
+                        logger.error(methodName, sset.getId(), e1);
+                        errors++;
+                    }
+
+                    // Wait a bit
+                    try {
+                        Thread.sleep(meta_ping_rate);
+                    } catch (InterruptedException e) {
+                        // nothing
+                    }
+                    
+                    // Try to read the response
+                    // TODO: set the socket timeout on this
+                    service_statistics = (ServiceStatistics) ois.readObject();
+                    if ( service_statistics == null ) {
+                        logger.error(methodName, sset.getId(), "Stats are null!");
+                        errors++;
+                    } else {
+                        if ( service_statistics.getPing() ) {
+                            synchronized(this) {
+                                if ( done ) return;
+                                sset.setResponsive();
+                            }
+                            logger.info(methodName, sset.getId(), "Ping ok: ", endpoint, service_statistics.toString());
+                            missed_pings = 0;
+                        } else {
+                            logger.error(methodName, sset.getId(), "Missed_pings ", missed_pings, "endpoint", endpoint);
+                            if ( ++missed_pings > meta_ping_stability ) {
+                                sset.setUnresponsive();
+                                logger.info(methodName, sset.getId(), "Seting state to unresponsive, endpoint",endpoint);
+                            } else {
+                                sset.setWaiting();
+                                logger.info(methodName, sset.getId(), "Seting state to waiting, endpoint,", endpoint);
+                            }                
+                        }
+                    }
+                }
+			} catch (IOException e) {
+                logger.error(methodName, sset.getId(), "Error receiving ping", e);
+                errors++;
+			} catch (ClassNotFoundException e) {
+                logger.error(methodName, sset.getId(), "Input garbled:", e);
+                errors++;
+			}
+        }       
+    }
+
+    class StdioListener
+        implements Runnable
+    {
+        InputStream in;
+        String tag;
+        boolean done = false;
+
+        StdioListener(int which, InputStream in)
+        {
+            this.in = in;
+            switch ( which ) {
+               case 1: tag = "STDOUT: "; break;
+               case 2: tag = "STDERR: "; break;
+            }
+        }
+
+        void stop()
+        {
+            this.done = true;
+        }
+
+        public void run()
+        {
+            if ( done ) return;
+            String methodName = "StdioListener.run";
+
+            BufferedReader br = new BufferedReader(new InputStreamReader(in));
+            while ( true ) {
+                try {
+                    String s = br.readLine();
+                    if ( test_mode ) System.out.println(tag + s);
+                    else             logger.info(methodName, sset.getId(), tag, s);
+                    if ( s == null ) {
+                        String msg = tag + "closed, listener returns";
+                        if ( test_mode ) System.out.println(msg);
+                        else             logger.info(methodName, sset.getId(), msg);
+                        return;
+                    }
+				} catch (IOException e) {
+                    // if anything goes wrong this guy is toast.
+                    if ( test_mode) e.printStackTrace();
+                    else            logger.error(methodName, sset.getId(), e);
+                    return;
+				}
+            }
+
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        // arg0 = amqurl = put into -Dbroker.url
+        // arg1 = endpoint - pass to ServicePingMain
+        // call ServicePingMain --class org.apache.uima.ducc.sm.PingTester --endpoint FixedSleepAE_1
+        //    make sure test.jar is in the classpath
+        CustomServiceMeta csm = new CustomServiceMeta(args[0]);
+        csm.run();
+    }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/CustomServiceMeta.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceManager.java?rev=1427972&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceManager.java Wed Jan  2 19:45:28 2013
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.sm;
+
+import org.apache.uima.ducc.transport.event.ServiceModifyEvent;
+import org.apache.uima.ducc.transport.event.ServiceQueryEvent;
+import org.apache.uima.ducc.transport.event.ServiceRegisterEvent;
+import org.apache.uima.ducc.transport.event.ServiceStartEvent;
+import org.apache.uima.ducc.transport.event.ServiceStopEvent;
+import org.apache.uima.ducc.transport.event.ServiceUnregisterEvent;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.sm.ServiceMap;
+
+/**
+ * 
+ */
+public interface IServiceManager 
+{
+    // Receive the new map and kick the thread to process it
+	public void orchestratorStateArrives(DuccWorkMap workMap);
+
+    // Deal with the incoming orchestrator map
+	public void processIncoming(DuccWorkMap workMap);
+
+    public void register(ServiceRegisterEvent ev);
+
+    public void unregister(ServiceUnregisterEvent ev);
+
+    public void start(ServiceStartEvent ev);
+
+    public void stop(ServiceStopEvent ev);
+
+    public void query(ServiceQueryEvent ev);
+
+    public void modify(ServiceModifyEvent ev);
+
+	//public SmStateDuccEvent getState();
+
+    public void publish(ServiceMap map);
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java?rev=1427972&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java Wed Jan  2 19:45:28 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.sm;
+
+import org.apache.uima.ducc.common.ServiceStatistics;
+
+
+interface IServiceMeta
+    extends Runnable
+{
+    ServiceStatistics getServiceStatistics();
+    public void run();
+    public void stop();
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceEndpointReader.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceEndpointReader.java?rev=1427972&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceEndpointReader.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceEndpointReader.java Wed Jan  2 19:45:28 2013
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.sm;
+
+/**
+ * Use ducc_ling to spawn a process as the user, with the user's classpath.  Read the
+ * process_DD descriptor and infer the jms endpoint of the service.
+ */
+public class ServiceEndpointReader
+{
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceEndpointReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1427972&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java Wed Jan  2 19:45:28 2013
@@ -0,0 +1,1320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.sm;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccProperties;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.ServiceModifyEvent;
+import org.apache.uima.ducc.transport.event.ServiceQueryEvent;
+import org.apache.uima.ducc.transport.event.ServiceQueryReplyEvent;
+import org.apache.uima.ducc.transport.event.ServiceReplyEvent;
+import org.apache.uima.ducc.transport.event.ServiceStartEvent;
+import org.apache.uima.ducc.transport.event.ServiceStopEvent;
+import org.apache.uima.ducc.transport.event.ServiceUnregisterEvent;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
+import org.apache.uima.ducc.transport.event.sm.ServiceDependency;
+import org.apache.uima.ducc.transport.event.sm.ServiceDescription;
+import org.apache.uima.ducc.transport.event.sm.ServiceMap;
+
+
+
+public class ServiceHandler
+    implements SmConstants, 
+               Runnable
+{
+    private DuccLogger logger = DuccLogger.getLogger(ServiceHandler.class.getName(), COMPONENT_NAME);	
+    private IServiceManager serviceManager;
+
+    private ServiceStateHandler serviceStateHandler = new ServiceStateHandler();
+	private ServiceMap serviceMap = new ServiceMap();       // note this is the sync object for publish
+
+    private HashMap<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>();
+    private HashMap<DuccId, IDuccWork> newServices = new HashMap<DuccId, IDuccWork>();
+    
+    private HashMap<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, IDuccWork>();
+    private HashMap<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, IDuccWork>();
+    
+    private HashMap<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, IDuccWork>();
+    private HashMap<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, IDuccWork>();
+    
+    public ServiceHandler(IServiceManager serviceManager)
+    {
+        this.serviceManager = serviceManager;        
+    }
+
+    public synchronized void run()
+    {
+    	String methodName = "run";
+        while ( true ) {
+            try {
+				wait();
+			} catch (InterruptedException e) {
+				logger.error(methodName, null, e);
+			}
+
+            try {
+                processUpdates();
+            } catch (Throwable t) {
+                logger.error(methodName, null, t);
+            }
+        }
+    }
+
+    /**
+     * At boot only ... pass in the set of all known active services to each service so it can update
+     * internal state with current published state.
+     */
+    void synchronizeImplementors(Map<DuccId, JobState> servicemap)
+    {
+        ArrayList<String> keys = serviceStateHandler.getServiceNames();
+        for ( String k : keys ) {
+            ServiceSet sset = serviceStateHandler.getServiceByName(k);
+            sset.synchronizeImplementors(servicemap);
+        }
+    }
+
+    void processUpdates()
+    {
+    	String methodName = "processUpdates";
+        logger.info(methodName, null, "Processing updates.");
+        HashMap<DuccId, IDuccWork> incoming;
+
+        incoming = new HashMap<DuccId, IDuccWork>();
+        synchronized(deletedJobs) {
+            incoming.putAll(deletedJobs);
+            deletedJobs.clear();
+        }
+        handleDeletedJobs(incoming);
+
+        incoming = new HashMap<DuccId, IDuccWork>();
+        synchronized(modifiedJobs) {
+            incoming.putAll(modifiedJobs);
+            modifiedJobs.clear();
+        }
+        handleModifiedJobs(incoming);
+
+        incoming = new HashMap<DuccId, IDuccWork>();
+        synchronized(deletedServices) {
+            incoming.putAll(deletedServices);
+            deletedServices.clear();
+        }
+        handleDeletedServices(incoming);
+
+        incoming = new HashMap<DuccId, IDuccWork>();
+        synchronized(modifiedServices) {
+            incoming.putAll(modifiedServices);
+            modifiedServices.clear();
+        }
+        handleModifiedServices(incoming);
+
+        incoming = new HashMap<DuccId, IDuccWork>();
+        synchronized(newServices) {
+            incoming.putAll(newServices);
+            newServices.clear();
+        }
+        handleNewServices(incoming);
+
+        incoming = new HashMap<DuccId, IDuccWork>();
+        synchronized(newJobs) {
+            incoming.putAll(newJobs);
+            newJobs.clear();
+        }
+        handleNewJobs(incoming);
+
+        synchronized(serviceMap) {
+            serviceManager.publish(serviceMap);
+        }
+
+        List<ServiceSet> regsvcs = serviceStateHandler.getRegisteredServices();
+        for ( ServiceSet sset : regsvcs ) {
+            sset.enforceAutostart();
+        }
+    }
+
+    void signalUpdates( // This is the incoming or map, with work split into categories.
+                                     // The incoming maps are volatile - must save contents before returning.
+                                    HashMap<DuccId, IDuccWork> newJobs, 
+                                    HashMap<DuccId, IDuccWork> newServices,                               
+                                    HashMap<DuccId, IDuccWork> deletedJobs,
+                                    HashMap<DuccId, IDuccWork> deletedServices,                                    
+                                    HashMap<DuccId, IDuccWork> modifiedJobs,
+                                    HashMap<DuccId, IDuccWork> modifiedServices
+                                    )
+    {
+
+        synchronized(newJobs) {
+            this.newJobs.putAll(newJobs);
+        }
+
+        synchronized(newServices) {
+            this.newServices.putAll(newServices);
+        }
+
+        synchronized(deletedJobs) {
+            this.deletedJobs.putAll(deletedJobs);
+        }
+
+        synchronized(deletedServices) {
+            this.deletedServices.putAll(deletedServices);
+        }
+
+        synchronized(modifiedJobs) {
+            this.modifiedJobs.putAll(modifiedJobs);
+        }
+
+        synchronized(modifiedServices) {
+            this.modifiedServices.putAll(modifiedServices);
+        }
+      
+        synchronized(this) {
+            notify();
+        }
+    } 
+    
+    /**
+     * Resolves state for the job in id based on the what it is dependent upon - the independent services
+     */
+    protected void resolveState(DuccId id, ServiceDependency dep)
+    {        
+        Map<String, ServiceSet> services = serviceStateHandler.getServicesForJob(id);
+        if ( services == null ) {
+            dep.setState(ServiceState.NotAvailable);       // says that nothing i need is available
+            return;
+        }
+
+        ServiceState state = ServiceState.Available;              
+        //
+        // Start with the most permissive state and reduce it as we walk the list
+        // Running > Initializing > Waiting > NotAvailable
+        //
+        // This sets the state to the min(all dependent service states)
+        //
+        for ( ServiceSet sset : services.values() ) {
+            if ( sset.getServiceState().ordinality() < state.ordinality() ) state = sset.getServiceState();
+             dep.setIndividualState(sset.getKey(), sset.getServiceState());
+        }
+        dep.setState(state);
+    }
+
+    /**
+     * This is called when an endpoint is referenced as a dependent service from a job or a service.
+     * It is called only when a new job or service is first discovred in the OR map.
+     */
+    protected Map<String, ServiceSet> resolveDependencies(DuccWorkJob w, ServiceDependency s)
+    {
+    	String methodName = "resolveDependencies";
+    	DuccId id = w.getDuccId();
+        String[] deps = w.getServiceDependencies();
+
+        // New services, if any are discovered
+        boolean fatal = false;
+        Map<String, ServiceSet> jobServices = new HashMap<String, ServiceSet>();
+        for ( String dep : deps ) {
+            
+            // put it into the global map of known services if needed and up the ref count
+            ServiceSet sset = serviceStateHandler.getServiceByName(dep);                            
+            if ( sset == null ) {                              // first time, so it's by reference only
+                try {
+                    sset = new ServiceSet(dep);
+                    serviceStateHandler.putServiceByName(dep, sset);
+                } catch ( IllegalArgumentException e ) {       // if 'dep' is invalid we throw
+                    s.addMessage(dep, e.getMessage());
+                    s.setState(ServiceState.NotAvailable);
+                    fatal = true;
+                    continue;
+                }
+            }
+
+            if ( sset.isDeregistered() ) { 
+                // Registerered services only - the service might even still be alive because it can
+                // take a while to get rid of these guys - we need to be sure we don't attach any
+                // new jobs to it.
+                s.addMessage(dep, "Independent registered service [" + dep + "] has been deregistered and is terminating.");
+                s.setState(ServiceState.NotAvailable);
+                fatal = true;
+                continue;
+            }
+
+            //
+            // We try to vet all services so the message is complete.  If we've already had some fatal problems
+            // we need to bypass any attempt to cope with registered services or updating the sset.
+            //
+            if ( ! fatal ) {
+                if ( sset.isRegistered() && (sset.countImplementors() == 0) ) {
+                    // Registered but not alive, well, we can fix that!
+                    int ninstances = sset.getNInstances();
+                    logger.debug(methodName, sset.getId(), "Reference-starting registered service, instances =", ninstances);
+                    sset.setReferencedStart(true);
+                    for ( int i = 0; i < ninstances; i++ ) {
+                        sset.start();
+                    }
+                }
+                
+                jobServices.put(dep, sset);
+                sset.reference(id);
+                serviceStateHandler.putServiceForJob(w.getDuccId(), sset);
+                logger.debug(methodName, id, "Service init ok. Ref[", dep, "] incr to", sset.countReferences());
+            }
+        }
+
+        if ( fatal ) {
+            jobServices.clear();            
+        }
+        return jobServices;
+    }
+ 
+    protected void handleNewJobs(HashMap<DuccId, IDuccWork> work)
+    { 
+        String methodName = "handleNewJobs";
+
+        // Map of updates to send to OR
+        HashMap<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>();
+
+        for ( DuccId id : work.keySet() ) {
+            DuccWorkJob w = (DuccWorkJob) work.get(id);
+
+            if ( !w.isActive() ) {
+                logger.info(methodName, id, "Bypassing inactive job, state =", w.getStateObject());
+                continue;
+            }
+
+            ServiceDependency s = new ServiceDependency(); // for the OR
+            updates.put(id, s);
+
+            String[] deps = w.getServiceDependencies();
+            if ( deps == null ) {   // no deps, just mark it running and move on
+                s.setState(ServiceState.Available);
+                logger.info(methodName, id, "Added to map, no service dependencies.");
+                continue;
+            }
+
+            //
+            // Get dependency references and fire up their state machines
+            //
+            Map<String, ServiceSet> jobServices = resolveDependencies(w, s);
+            for ( ServiceSet sset : jobServices.values() ) {
+                sset.establish();
+            }
+            resolveState(id, s);
+            logger.info(methodName, id, "Added job to map, with service dependency state.", s.getState());
+        }
+
+        serviceMap.putAll(updates);
+    }
+
+    /**
+     * A job or service has ended.  Here's common code to clean up the dependent services.
+     * @param id - the id of the job or service that stopped
+     * @param deps - the services that 'id' was dependent upon
+     */
+    protected void stopDependentServices(DuccId id)
+    {
+    	String methodName = "stopDependentServices";
+
+        Map<String, ServiceSet> deps = serviceStateHandler.getServicesForJob(id);
+        if ( deps == null ) {
+            logger.debug(methodName, id, "No dependent services to stop, returning.");
+            return;                                              // service already deleted, timing issue
+        }
+
+        //
+        // Bop through all the things job 'id' is dependent upon, and update their refcounts. If
+        // the refs go to 0 we stop the pinger and sometimes the independent service itself. 
+        //
+        for ( String dep : deps.keySet() ) {
+            logger.debug(methodName, id, "Looking up service", dep);
+            
+            ServiceSet sset = deps.get(dep);
+            if ( sset == null ) {
+                throw new IllegalStateException("Null service for " + dep);      // sanity check, should never happen
+            }
+            
+            int count = sset.dereference(id);                                    // also maybe stops the pinger
+            logger.debug(methodName, id, "Ref count for", sset.getKey(), "goes down to", count);
+            if ( count == 0 ) {
+                if ( sset.isImplicit() ) {
+                    logger.debug(methodName, id, "Removing unreferenced implicit service", dep, "refcount", count);
+                    serviceStateHandler.removeService(dep);
+                }
+                if ( sset.isRegistered() && sset.isReferencedStart() ) {
+                    logger.debug(methodName, id, "Stopping reference-started service", dep, "refcount", count);
+                    sset.lingeringStop();
+                }
+
+            }
+        }
+
+        // last, indicate that job 'id' has nothing its dependent upon any more
+        serviceStateHandler.removeServicesForJob(id);            
+    }
+
+    protected void handleDeletedJobs(HashMap<DuccId, IDuccWork> work)
+    {
+        String methodName = "handleCompletedJobs";
+
+        for ( DuccId id : work.keySet() ) {
+            DuccWorkJob w = (DuccWorkJob) work.get(id);
+            
+            String[] deps = w.getServiceDependencies();
+            if ( deps == null ) {   // no deps, just mark it running and move on
+                logger.info(methodName, id, "No service dependencies, no updates made.");
+                continue;
+            }
+
+            stopDependentServices(id);
+
+            logger.info(methodName, id, "Deleted job from map");
+        }
+
+        serviceMap.removeAll(work.keySet());
+    }
+
+    protected void handleModifiedJobs(HashMap<DuccId, IDuccWork> work)
+    {
+        String methodName = "handleModifiedJobs";
+
+        //
+        // Only look at active jobs.  The others will be going away soon and we use
+        // that time as a grace period to keep the management machinery running in 
+        // case more work comes in in the next few minutes.
+        //
+        // Everything is already in the service map so we just update the state.
+        //
+        for ( DuccId id : work.keySet() ) {
+            
+            DuccWorkJob j = (DuccWorkJob) work.get(id);
+            String[] deps = j.getServiceDependencies();
+            if ( deps == null ) {   // no deps, just mark it running and move on
+                logger.info(methodName, id, "No service dependencies, no updates made.");
+                continue;
+            }
+
+            ServiceDependency s = serviceMap.get(id);
+            if ( j.isFinished() ) {
+                stopDependentServices(id);
+                s.setState(ServiceState.NotAvailable);
+            } else  if ( j.isActive() ) {
+                resolveState(id, s);
+            } 
+        }
+
+    }
+
+    protected void handleNewServices(HashMap<DuccId, IDuccWork> work)
+    {
+        String methodName = "handleNewServices";
+
+        Map<DuccId, ServiceDependency>  updates = new HashMap<DuccId, ServiceDependency>();   // to be added to the service map sent to OR
+        Map<String, ServiceSet>     newservices = new HashMap<String, ServiceSet>();          // to be added to our internal maps in serviceState
+        for ( DuccId id : work.keySet() ) {
+            DuccWorkJob w = (DuccWorkJob) work.get(id);
+
+            // On restart we sometimes get stale stuff that we just ignore.
+            // What else? Is the the right thing to do?
+            //  
+            // TODO: For registered services we probably have to call some sort of recovery code
+            if ( !w.isActive() ) {
+                logger.info(methodName, id, "Bypassing inactive service, state=", w.getStateObject());
+                continue;
+            }
+
+            ServiceDependency s = new ServiceDependency();
+            updates.put(id, s);
+
+            String endpoint = w.getServiceEndpoint();
+            if ( endpoint == null ) {                                     // the job is damaged if this happens
+                String msg = "No service endpoint.  Service cannot be validated.";
+                logger.warn(methodName, id, msg);
+                s.addMessage("null", msg);                                // this is a fatal state always
+                s.setState(ServiceState.NotAvailable);
+                continue;
+            }
+
+            String[] deps = w.getServiceDependencies();                  // other services this svc depends on
+            ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);
+            if ( sset == null ) {
+                // submitted, we just track but not much else
+                try {
+                    sset = new ServiceSet(id, endpoint, deps);             // creates a "submitted" service
+                    serviceStateHandler.putServiceByName(endpoint, sset);
+                } catch ( IllegalArgumentException e ) {
+                    s.addMessage(endpoint, e.getMessage());
+                    s.setState(ServiceState.NotAvailable);
+                    continue;
+                }
+            } else if ( sset.isDeregistered() ) {
+                s.addMessage(endpoint, "Duplicate endpoint: terminating deregistered service.");
+                s.setState(ServiceState.NotAvailable);
+                continue;
+            } else if ( sset.matches(id) ) {
+                // TODO: not clear we have to do anything here since establish() below will
+                //       add to the implementors.  Be sure to update the check so the
+                //       code in the following 'else' clause is executed correctly though.
+
+                // and instance/implementor of our own registered services
+                sset.addImplementor(id, w.getJobState());
+            } else { 
+                //
+                // If the new service is not a registered service, and it is a duplicate of another service
+                // which isn't registered, we allow it to join the party.
+                //
+                // When it joins, it needs to "propmote" the ServiceSet to "Submitted".
+                //
+                // a) in the case of "implicit" we don't know enough to many any moral judgements at all
+                // b) in the case of "submitted" it could be the user is increasing the pool of servers by
+                //    submitting more jobs.  Perhaps we would better handle this via modify but for the moment,
+                //    just allow it.
+                // c) in the case of "registered" we know and manage everything and don't allow it.  users must
+                //    use the services modify api to increase or decrease instances.
+                //
+
+                if ( !sset.isRegistered() ) {
+                    sset.addImplementor(id, w.getJobState());
+                    sset.promote();          // we'll do this explicitly as a reminder that it's happening and
+                                             // to insure we NEVER promote a registered service (which is actually
+                                             // a demotion!).
+                } else {
+                    String msg = "Duplicate endpoint: Registered service.";
+                    logger.warn(methodName, id, msg);
+                    s.addMessage(endpoint, msg);
+                    s.setState(ServiceState.NotAvailable);
+                    continue;
+                }
+            }
+
+            // The service is new and unique if we get this far
+
+            //
+            // No deps.  Put it in the map and move on.
+            //
+            if ( deps == null ) {                
+                logger.info(methodName, id, "Added service to map, no service dependencies. ");
+                s.setState(ServiceState.Available);                        // good to go in the OR (the state of things i'm dependent upon)
+                sset.establish(id, w.getJobState());                       // sets my own state based entirely on state of w
+                continue;
+            }
+
+            Map<String, ServiceSet> jobServices = resolveDependencies(w, s); // 
+            for ( ServiceSet depset : jobServices.values() ) {
+                depset.establish();
+            }
+            resolveState(id, s);
+            sset.establish(id, w.getJobState());
+            logger.info(methodName, id, "Added to map, with service dependencies,", s.getState());
+        }
+
+        serviceStateHandler.recordNewServices(newservices);
+        serviceMap.putAll(updates);
+    }
+
+    //
+    // We're here because we got OR state for the service that it has stopped running.
+    // Must clean up.
+    //
+    protected void handleDeletedServices(HashMap<DuccId, IDuccWork> work)
+    {
+        String methodName = "handleDeletedServices";
+
+        for ( DuccId id : work.keySet() ) {
+        	DuccWorkJob w = (DuccWorkJob) work.get(id);
+        	String endpoint = w.getServiceEndpoint();
+            logger.info(methodName, id, "Deleted service:", endpoint);
+            
+            // 
+            // Dereference and maybe stop the services I'm dependent upon
+            //
+            if ( w.getServiceDependencies() == null ) { 
+                logger.info(methodName, id, "No service dependencies to update on removal.");
+            } else {
+                stopDependentServices(id);        // update references, remove implicit services if any
+            }
+
+            if (endpoint == null ) {              // probably impossible but lets not chance NPE
+                logger.warn(methodName, id, "Missing service endpoint, ignoring.");
+                continue;
+            }
+            ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);            
+
+            // may have been removed already if we saw it go to complet[ed/ing] and it lingered a while anyway, which is usual
+            if ( sset != null ) {                
+                sset.removeImplementor(id);                      // also stops the ping thread if it's the last one
+            }
+        }
+
+        //serviceStateHandler.removeServicesForJobs(work.keySet());   // services we were dependent upon
+        serviceMap.removeAll(work.keySet());                          // and finally the deleted services
+                                                                      // from the published map
+    }
+    
+    protected void handleModifiedServices(HashMap<DuccId, IDuccWork> work)
+    {
+        String methodName = "handleModifiedServices";        
+        
+        //
+        // This is a specific service process, but not necessarily the whole service.
+        //
+        for ( DuccId id : work.keySet() ) {
+            DuccWorkJob w = (DuccWorkJob) work.get(id);
+            String endpoint = w.getServiceEndpoint();
+
+            if (endpoint == null ) {              // probably impossible but lets not chance NPE
+                logger.info(methodName, id, "Missing service endpoint, ignoring.");
+                continue;
+            }
+
+            ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);
+            if ( sset == null ) {
+                // may have already died and this is just leftover OR publications.
+                if ( w.isActive() ) {             // or maybe we just screwed up!
+                    try {
+                        throw new IllegalStateException("Got update for service " + id.toString() + " but no ServiceSet! Job state: " + w.getJobState());
+                    } catch ( Throwable t ) {
+                        // catch and log stack but don't crash SM
+                        logger.error(methodName, id, t);
+                        continue;
+                    }
+                }
+                continue;
+            }
+
+            ServiceDependency s = serviceMap.get(id);
+            if ( w.isFinished() ) {              // nothing more, just dereference and maybe stop stuff I'm dependent upon
+                stopDependentServices(id);
+                s.setState(ServiceState.NotAvailable);              // tell orchestrator
+            } else  if ( w.getServiceDependencies() != null ) {     // update state from things I'm dependent upon
+                resolveState(id, s);
+            } 
+
+            // now factor in cumulative state of the implementors and manage the ping thread as needed
+            sset.establish(id, w.getJobState());
+
+            if ( (sset.getServiceState() == ServiceState.NotAvailable) && (sset.countReferences() == 0) ) {
+                // this service is now toast.  remove from our maps asap to avoid clashes if it gets
+                // resubmitted before the OR can purge it.
+                if ( ! sset.isRegistered() ) {
+                    logger.debug(methodName, id, "Removing service", endpoint, "because it died and has no more references.");
+                    serviceStateHandler.removeService(endpoint);
+                }
+                serviceStateHandler.removeServicesForJob(id);
+            }
+        }
+        
+    }
+
+    /**
+     * Add in the service dependencies to the query.
+     */
+    void updateServiceQuery(ServiceDescription sd, ServiceSet sset)
+    {
+
+        if ( sset.isRegistered() ) {
+            // 
+            // The thing may not be running yet / at-all.  Pull out the deps from the registration and
+            // query them individually.
+            //
+            String[] deps = sset.getIndependentServices();
+            if ( deps != null ) {
+                for ( String dep : deps ) {
+                    ServiceSet independent = serviceStateHandler.getServiceByName(dep);
+                    if ( independent != null ) {
+                        sd.addDependency(dep, independent.getServiceState().decode());
+                    } else {
+                        sd.addDependency(dep, ServiceState.NotAvailable.decode());
+                    }
+                }
+            }
+        } else {
+            //
+            // If it's not registered we have to look up all the dependencies of the implementors instead
+            //
+            Map<DuccId, JobState> implementors = sset.getImplementors();
+            for ( DuccId id : implementors.keySet() ) {
+                Map<String, ServiceSet> deps = serviceStateHandler.getServicesForJob(id); // all the stuff 'id' is dependent upon
+                if ( deps != null ) {
+                    for ( String s : deps.keySet() ) {
+                        ServiceSet depsvc = deps.get(s);
+                        sd.addDependency(s, depsvc.getServiceState().decode());
+                    }
+                }                
+            }
+        }
+    }
+
+    ServiceQueryReplyEvent query(ServiceQueryEvent ev)
+    {
+    	//String methodName = "query";
+        long friendly = ev.getFriendly();
+        String epname = ev.getEndpoint();
+        ServiceQueryReplyEvent reply = new ServiceQueryReplyEvent();
+
+        if (( friendly == -1) && ( epname == null )) {
+            ArrayList<String> keys = serviceStateHandler.getServiceNames();
+            for ( String k : keys ) {
+                ServiceSet sset = serviceStateHandler.getServiceByName(k);
+                if ( k == null ) continue;                    // the unlikely event it changed out from under us
+                
+                ServiceDescription sd = sset.query();
+                updateServiceQuery(sd, sset);
+                reply.addService(sd);
+            }
+        } else {
+            ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+            ServiceDescription sd = sset.query();
+            updateServiceQuery(sd, sset);
+            reply.addService(sd);
+        } 
+
+        return reply;
+    }
+
+    ServiceReplyEvent start(ServiceStartEvent ev)
+    {
+        //String methodName = "start";
+        
+        long friendly = ev.getFriendly();
+        String epname = ev.getEndpoint();
+        String serviceIdString = extractId(friendly, epname);
+        ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+
+        if ( sset == null ) {
+            return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + serviceIdString + " does not exist.", null, null);
+        }
+
+        if ( sset.isRegistered() ) {
+            int running = sset.countImplementors();
+            int instances = ev.getInstances();
+            int registered = sset.getNInstances();
+            int wanted = 0;
+
+            if ( instances == 0 ) {
+                wanted = Math.max(0, registered - running);
+            } else {
+                wanted = instances;
+            }
+            if ( wanted == 0 ) {
+                return new ServiceReplyEvent(ServiceCode.NOTOK, 
+                                             "Service " + serviceIdString + " is started, instances[" + running + "]: ", 
+                                             sset.getKey(), 
+                                             sset.getId());
+            }
+
+            // only start something if we don't have enought already going
+            ApiHandler  apih = new ApiHandler(ev, this);
+            Thread t = new Thread(apih);
+            t.start();
+            return new ServiceReplyEvent(ServiceCode.OK, 
+                                         "Service " + serviceIdString + " start request accepted, new instances[" + wanted + "]", 
+                                         sset.getKey(), 
+                                         sset.getId());
+        } else {
+            return new ServiceReplyEvent(ServiceCode.NOTOK, 
+                                         "Service " + serviceIdString + " is not a registered service.", 
+                                         sset.getKey(), 
+                                         null);    
+        }
+    }
+
+    //
+    // Everything to do this must be vetted before it is called
+    //
+    // Start with no instance says: start enough new processes to get up the registered amount
+    // Start with some instances says: start exactly this many
+    // If the --save option is included, also update the registration
+    //
+    void doStart(long friendly, String epname, int instances, boolean update)
+    {
+    	//String methodName = "doStart";
+        ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+
+        int running    = sset.countImplementors();
+        int registered = sset.getNInstances();
+        int wanted     = 0;
+
+        if ( instances == 0 ) {
+            wanted = Math.max(0, registered - running);
+        } else {
+            wanted = instances;
+        }
+
+        if ( update ) {
+            sset.setNInstances(running + instances);
+        }
+                          
+        for ( int i = 0; i < wanted; i++ ) {
+            sset.start();
+        } 
+
+
+    }
+
+    ServiceReplyEvent stop(ServiceStopEvent ev)
+    {
+        long friendly = ev.getFriendly();
+        String epname = ev.getEndpoint();
+        String serviceIdString = extractId(friendly, epname);
+        ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+
+        if ( sset == null ) {
+            return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + serviceIdString + " does not exist.", null, null);
+        }
+
+        if ( sset.isRegistered() ) {
+            if ( (sset.countImplementors() == 0) && ( sset.isUimaAs()) ) {
+                return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + serviceIdString + " is already stopped.", sset.getKey(), sset.getId());
+            }
+
+            int running    = sset.countImplementors();
+            int instances  = ev.getInstances();
+            int tolose;
+            if ( instances == 0 ) {
+                tolose     = running;
+            } else {
+                tolose     = Math.min(instances, running);
+            }
+            
+            ApiHandler  apih = new ApiHandler(ev, this);
+
+            Thread t = new Thread(apih);
+            t.start();
+            return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " stop request accepted for [" + tolose + "] instances.", sset.getKey(), sset.getId());
+        } else {
+            return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + friendly + " is not a registered service.", sset.getKey(), null);            
+        }
+
+    }
+
+    //
+    // Everything to do this must be vetted before it is called
+    //
+    // If instances == 0 set stop the whole service
+    // Otherwise we just stop the number asked for
+    // If --save is insicated we update the registry
+    //
+    void doStop(long friendly, String epname, int instances, boolean update)
+    {
+        //String methodName = "doStop";
+
+        ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+
+        int running    = sset.countImplementors();
+        int tolose;
+        if ( instances == 0 ) {
+            tolose = running;
+        } else {
+            tolose = Math.min(instances, running);
+        }
+
+        if ( update ) {
+            sset.setNInstances(Math.max(0, running - instances)); // never persist < 0 registered instance
+        }
+        
+        if ( tolose == running ) {
+            sset.stop();                                  // blind stop
+        } else {
+            sset.stop(tolose);                            // selective stop (TODO: eventually)
+        }
+    }
+
+    ServiceReplyEvent register(DuccId id, String props_filename, String meta_filename, DuccProperties props, DuccProperties meta)
+    {
+    	String methodName = "register";
+        ServiceSet sset = new ServiceSet(id, props_filename, meta_filename, props, meta);
+        String key = sset.getKey();
+        
+        String error = null;
+        boolean must_deregister = false;
+        if (serviceStateHandler.getServiceByName(key) == null ) {
+            try {
+                props.store(new FileOutputStream(props_filename), "Service descriptor.");
+            } catch ( Exception e ) {
+                error = ("Internal error; unable to store service descriptor. " + key);
+                logger.error(methodName, id, e);
+                must_deregister = true;
+            }
+            
+            try {
+                if ( ! must_deregister ) {
+                    meta.store(new FileOutputStream(meta_filename), "Meta descriptor");
+                }
+            } catch ( Exception e ) {
+                error = ("Internal error; unable to store service meta-descriptor. " + key);
+                logger.error(methodName, id, e);
+                must_deregister = true;
+            }
+
+            // must check for cycles or we can deadlock
+            if ( ! must_deregister ) {
+                CycleChecker cc = new CycleChecker(sset);
+                if ( cc.hasCycle() ) {
+                    error = ("Service dependencies contain a cycle with " + cc.getCycles());
+                    logger.error(methodName, id, error);
+                    must_deregister = true;
+                }
+            }
+        } else {
+            error = ("Duplicate service: " + key + ".  Registration fails");
+        }
+
+        if ( error == null ) {
+            serviceStateHandler.putServiceByName(sset.getKey(), sset);
+            return new ServiceReplyEvent(ServiceCode.OK, "Registered service.", key, id);
+        } else {
+            File mf = new File(meta_filename);
+            mf.delete();
+            
+            File pf = new File(props_filename);
+            pf.delete();
+            return new ServiceReplyEvent(ServiceCode.NOTOK, error, key, id);
+        }
+    }
+
+    public ServiceReplyEvent modify(ServiceModifyEvent ev)
+    {
+        long friendly       = ev.getFriendly();
+        String epname = ev.getEndpoint();
+        String serviceIdString = extractId(friendly, epname);
+    	ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+
+    	if ( sset.isRegistered() ) {            
+            ApiHandler  apih = new ApiHandler(ev, this);
+
+            Thread t = new Thread(apih);
+            t.start();
+            return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " modify request accepted.", sset.getKey(), sset.getId());
+        } else {
+            return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + friendly + " is not a registered service.", sset.getKey(), null);            
+        }
+    }
+
+    void doModify(long friendly, String epname, int instances, Trinary autostart, boolean activate)
+    {
+        //String methodName = "doStop";
+
+        ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+        
+        if ( instances >= 0 ) {
+            sset.setNInstances(instances);                // also persists instances
+        }
+
+        if ( autostart != Trinary.Unset ) {
+            sset.setAutostart(autostart.decode());
+        }
+
+        if ( activate ) {
+            int running    = sset.countImplementors();
+            int diff = instances - running;
+            
+            if ( diff > 0 ) {
+                while ( diff-- > 0 ) {
+                    sset.start();
+                }
+            } else if ( diff < 0 ) {
+                sset.stop(-diff);
+            }
+        }
+    }
+
+    public ServiceReplyEvent unregister(ServiceUnregisterEvent ev)
+    {
+        long friendly = ev.getFriendly();
+        String epname = ev.getEndpoint();
+        String serviceIdString = extractId(friendly, epname);
+        ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
+
+        if ( sset == null ) {
+            return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + serviceIdString + " does not exist.",  serviceIdString, null);
+        }
+
+        if ( sset.isRegistered() ) {            
+            sset.deregister();          // just sets a flag so we know how to handle it when it starts to die
+            ApiHandler  apih = new ApiHandler(ev, this);
+            Thread t = new Thread(apih);
+            t.start();
+            return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " unregistered. Shutting down implementors.", sset.getKey(), sset.getId());
+        } else {
+            return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + serviceIdString + " is not a registered service.", sset.getKey(), null);            
+        }
+        
+    }
+
+    //
+    // Everything to do this must be vetted before it is called. Run in a new thread to not hold up the API.
+    //
+    void doUnregister(long friendly, String epname)
+    {
+    	String methodName = "doUnregister";
+        ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);                
+
+        if ( sset.countImplementors() > 0 ) {
+            logger.debug(methodName, sset.getId(), "Stopping implementors:", friendly, epname);
+            sset.stop();
+        } else {
+            logger.debug(methodName, sset.getId(), "Removing from map:", friendly, epname);
+            serviceStateHandler.removeService(epname, friendly);
+        }
+
+        String metafn =  sset.getMetaFilename();
+        String propsfn = sset.getPropsFilename();
+
+        if ( metafn != null ) {
+            File mf = new File(metafn);
+            mf.delete();
+        }
+        if ( propsfn != null ) {
+            File pf = new File(propsfn);
+            pf.delete();
+        }
+
+    }
+
+    String extractId(long friendly, String epname)
+    {
+        return ((epname == null) ? Long.toString(friendly) : epname);
+    }
+
+    /**
+     * From: http://en.wikipedia.org/wiki/Topological_sorting
+     *
+     * L � Empty list that will contain the sorted elements
+     * S � Set of all nodes with no incoming edges
+     * while S is non-empty do
+     *     remove a node n from S
+     *     insert n into L
+     *     for each node m with an edge e from n to m do
+     *         remove edge e from the graph
+     *         if m has no other incoming edges then
+     *             insert m into S
+     * if graph has edges then
+     *     return error (graph has at least one cycle)
+     * else 
+     *     return L (a topologically sorted order)
+     */
+    class CycleChecker
+    {
+        ServiceSet sset;
+        int edges = 0;
+        List<String> cycles = null;
+
+        CycleChecker(ServiceSet sset)
+        {
+            this.sset = sset;
+        }
+
+        boolean hasCycle()
+        {
+            // Start by building the dependency graph
+            // TODO: Maybe consider saving this.  Not clear there's much of a
+            //       gain doing the extra bookeeping beause the graphs will always
+            //       be small and will only need checking on registration or arrival
+            //       of a submitted service.  So this cycle checking is always
+            //       fast anyway.
+            //
+            //       Bookeeping could be a bit ugly because a submitted service could
+            //       bop in and change some dependency graph.  We really only care
+            //       for checking cycles, so we'll check the cycles as things change
+            //       and then forget about it.
+            //
+            String[] deps = sset.getIndependentServices();
+            if ( deps == null ) return false;          // man, that was fast!
+
+            Map<String, ServiceSet> visited = new HashMap<String, ServiceSet>();     // all the nodes in the graph
+            clearEdges(sset, visited);
+
+            List<ServiceSet> nodes = new ArrayList<ServiceSet>();
+            nodes.addAll(visited.values());
+            buildGraph(nodes);
+            
+            List<ServiceSet>        sorted = new ArrayList<ServiceSet>();          // topo-sorted list of nodes
+            List<ServiceSet>        current = new ArrayList<ServiceSet>();         // nodes with no incoming edges
+
+            // Constant: current has all nodes with no incoming edges
+            for ( ServiceSet node : nodes ) {
+                if ( ! node.hasPredecessor() ) current.add(node);
+            }
+
+            while ( current.size() > 0 ) {
+                ServiceSet next = current.remove(0);                            // remove a node n from S
+                sorted.add(next);                                               // insert n int L
+                List<ServiceSet> successors = next.getSuccessors();
+                for ( ServiceSet succ : successors ) {                          // for each node m(pred) with an edge e from n to m do
+                    next.removeSuccessor(succ);                                 // remove edge from graph
+                    succ.removePredecessor(next);                               //    ...
+                    edges--;
+                    if ( !succ.hasPredecessor() ) current.add(succ);            // if m(pred) has no incoming edges insert m into S
+                }
+            }
+
+            if ( edges == 0 ) return false;                                     // if graph has no edges, no cycles
+
+            cycles = new ArrayList<String>();                                   // oops, and here they are
+            for ( ServiceSet node : nodes ) {
+                if ( node.hasSuccessor() ) {
+                    for ( ServiceSet succ : node.getSuccessors() ) {
+                        cycles.add(node.getKey() + " -> " + succ.getKey());
+                    }
+                }
+            }
+            return true;
+        }
+
+        String getCycles()
+        {
+            return cycles.toString();
+        }
+        
+        //
+        // Traveerse the graph and make sure all the nodes are "clean" 
+        //
+        void clearEdges(ServiceSet node, Map<String, ServiceSet> visited)
+        {
+            String key = node.getKey();
+            node.clearEdges();
+            if ( visited.containsKey(key) ) return;
+
+            visited.put(node.getKey(), node);
+            String[] deps = node.getIndependentServices();
+            if ( deps == null ) return;
+            
+            for ( String dep : deps ) {
+                ServiceSet sset = serviceStateHandler.getServiceByName(dep);
+                clearEdges(sset, visited);
+            }
+        }
+            
+        void buildGraph(List<ServiceSet> nodes)
+        {            
+            for ( ServiceSet node : nodes ) {           
+                String[] deps = node.getIndependentServices();           // never null if we get this far
+                if ( deps != null ) {
+                    for ( String d : deps ) {
+                        ServiceSet outgoing = serviceStateHandler.getServiceByName(d);
+                        outgoing.setIncoming(node);
+                        node.setOutgoing(outgoing);
+                        edges++;
+                    }
+                }
+            }
+        }
+    }
+
+    class ServiceStateHandler
+    {
+
+        // Map of active service descriptors by endpoint.  For UIMA services, key is the endpoint.
+        private HashMap<String,  ServiceSet>  servicesByName     = new HashMap<String,  ServiceSet>();
+        private HashMap<Long,    ServiceSet>  servicesByFriendly = new HashMap<Long,    ServiceSet>();
+
+        // For each job, the collection of services it is dependent upon
+        // DUccId is a Job Id (or id for serice that has dependencies)
+        private HashMap<DuccId, Map<String, ServiceSet>>  servicesByJob = new HashMap<DuccId, Map<String, ServiceSet>>();
+
+        ServiceStateHandler()
+        {
+        }
+
+        /**
+         * Return a copy of the keys so we can fetch the services in an orderly manner.
+         */
+        synchronized ArrayList<String> getServiceNames()
+        {
+            ArrayList<String> answer = new ArrayList<String>();
+            for ( String k : servicesByName.keySet() ) {
+                answer.add(k);
+            }
+            return answer;
+        }
+
+        synchronized ServiceSet getServiceByName(String n)
+        {
+            return servicesByName.get(n);
+        }
+
+        synchronized ServiceSet getServiceByFriendly(long id)
+        {
+            return servicesByFriendly.get( id );
+        }
+
+        // API passes in a friendly (maybe) and an endpiont (maybe) but only one of these
+        // Here we look up the service by whatever was passed in.
+        synchronized ServiceSet  getServiceForApi(long id, String n)
+        {
+            if ( n == null ) return getServiceByFriendly(id);
+            return getServiceByName(n);
+        }
+
+        synchronized List<ServiceSet> getRegisteredServices()
+        {
+            ArrayList<ServiceSet> answer = new ArrayList<ServiceSet>();
+            for ( ServiceSet sset : servicesByName.values() ) {
+                if ( sset.isRegistered() ) {
+                    answer.add(sset);
+                }
+            }
+            return answer;
+        }
+
+        synchronized void putServiceByName(String n, ServiceSet s)
+        {
+            servicesByName.put(n, s);
+            DuccId id = s.getId();
+            if ( id != null ) {
+                servicesByFriendly.put(id.getFriendly(), s);
+            }
+        }
+
+        synchronized ServiceSet removeService(String n)
+        {
+            ServiceSet s = servicesByName.remove(n);
+            if ( s != null ) {
+                DuccId id = s.getId();
+                if ( id != null ) {
+                    servicesByFriendly.remove(id.getFriendly());
+                }
+            }
+            return s;
+        }
+
+        synchronized void removeService(long id)
+        {
+            ServiceSet sset = servicesByFriendly.remove(id);
+            if ( sset != null ) {
+                String key = sset.getKey();
+                servicesByName.remove(key);
+            }
+        }
+
+        synchronized void removeService(String n, long id)
+        {
+            if ( n == null ) removeService(id);
+            else             removeService(n);
+        }
+
+        synchronized Map<String, ServiceSet> getServicesForJob(DuccId id)
+        {
+
+        	return servicesByJob.get(id);
+        }
+
+        synchronized void putServiceForJob(DuccId id, ServiceSet s)
+        {
+            Map<String, ServiceSet> services = servicesByJob.get(id);
+            if ( services == null ) {
+                services = new HashMap<String, ServiceSet>();
+                servicesByJob.put(id, services);
+            }
+            services.put(s.getKey(), s);
+        }
+
+        synchronized void removeServicesForJob(DuccId id)
+        {
+            servicesByJob.remove(id);
+        }
+
+        synchronized void recordNewServices(Map<String, ServiceSet> services) 
+        {
+            servicesByName.putAll(services);
+        }
+
+    }
+
+    /**
+     * Tester for topo sorter.
+     * Input is props file, e.g. for the graph:
+     *   A -> B, A -> C, B -> C:
+     *
+     *    services = A B C
+     *    svc.A = B C
+     *    svc.B = C
+     *    svc.C = 
+     *
+     */
+    private void runSortTester(String propsfile)
+    {
+        int friendly = 1;
+        DuccProperties props = new DuccProperties();
+        try {
+			props.load(propsfile);
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+            System.exit(1);
+		}
+
+        
+        String svcnames = props.getStringProperty("services");
+        String[] svcs = svcnames.split("\\s");
+        ServiceSet[] allServices = new ServiceSet[svcs.length];
+        int ndx = 0;
+        for ( String svc : svcs ) {
+            svc = svc.trim();
+            String key = "UIMA-AS:" + svc + ":tcp://foo:123";
+            ServiceSet dep = serviceStateHandler.getServiceByName(key);
+            if ( dep == null ) {
+                dep = new ServiceSet(new DuccId(friendly++), key, null);
+                serviceStateHandler.putServiceByName(key, dep);
+                allServices[ndx++] = dep;
+            }
+
+            String depnames = props.getStringProperty("svc." + svc);
+            String[] deps = depnames.split("\\s");
+            List<String> subdeps = new ArrayList<String>();
+            for ( String subsvc : deps ) {
+                subsvc = subsvc.trim();
+                if ( subsvc.equals("")) continue;
+
+                String subkey = "UIMA-AS:" + subsvc + ":tcp://foo:123";
+                ServiceSet subdep = serviceStateHandler.getServiceByName(subkey);
+                if ( subdep == null ) {
+                    subdep = new ServiceSet(new DuccId(friendly++), subkey, null);
+                    serviceStateHandler.putServiceByName(subkey, subdep);
+                    allServices[ndx++] = subdep;
+                }
+                subdeps.add(subkey);
+            }
+            if ( subdeps.size() > 0 ) {
+            	dep.setIndependentServices(subdeps.toArray(new String[subdeps.size()]));
+            }
+        }
+
+        CycleChecker cc = new CycleChecker(allServices[0]);
+        if ( cc.hasCycle() ) {
+            System.out.println("Service dependencies contain a cycle with " + cc.getCycles());
+        } else {
+            System.out.println("No cycles detected");
+        }
+
+    }
+
+    // tester for the topo sorter
+    public static void main(String[] args)
+    {
+        ServiceHandler sh = new ServiceHandler(null);
+        sh.runSortTester(args[0]);
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native