You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/15 18:37:20 UTC
svn commit: r1433548 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm:
PingDriver.java ServiceHandler.java ServiceManagerComponent.java
ServicePingMain.java ServiceSet.java TcpStreamHandler.java UimaAsPing.java
Author: cwiklik
Date: Tue Jan 15 17:37:20 2013
New Revision: 1433548
URL: http://svn.apache.org/viewvc?rev=1433548&view=rev
Log:
UIMA-2577 committing for jim
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java (with props)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java?rev=1433548&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java Tue Jan 15 17:37:20 2013
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.sm;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+
+import org.apache.uima.ducc.common.ServiceStatistics;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccProperties;
+
+
+/**
+ * This runs the watchdog thread for custom service pingers.
+ *
+ * It spawns a process, as the user, which in turn will instantiate an object which extends
+ * AServiceMeta to implement the pinger.
+ *
+ * The processes communicate via a pipe: every ping interval the meta puts relevent information onto its
+ * stdout:
+ * 0|1 long long
+ * The first token is 1 if the ping succeeded, 0 otherwise.
+ * The second token is the total cumulative work executed by the service.
+ * The third token is the current queue depth of the service.
+ */
+
+class PingDriver
+ implements IServiceMeta,
+ SmConstants
+{
+ private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);
+
+ String[] jvm_args;
+ String endpoint;
+ String ping_class;
+ String classpath;
+ boolean ping_ok;
+ int missed_pings = 0;
+ ServiceSet sset;
+ boolean test_mode = false;
+
+ Process ping_main;
+
+ StdioListener sin_listener = null;
+ StdioListener ser_listener = null;
+ PingThread pinger = null;
+
+ int meta_ping_rate;
+ int meta_ping_stability;
+ String meta_ping_timeout;
+ ServiceStatistics service_statistics = null;
+
+ String user;
+ String working_directory;
+ String log_directory;
+
+ boolean do_log = true;
+
+ PingDriver(ServiceSet sset)
+ {
+ this.sset = sset;
+ DuccProperties job_props = sset.getJobProperties();
+ DuccProperties meta_props = sset.getMetaProperties();
+
+ this.endpoint = meta_props.getStringProperty("endpoint");
+ this.user = meta_props.getStringProperty("user");
+ String jvm_args_str = job_props.getStringProperty("service_ping_jvm_args", "");
+ this.ping_class = job_props.getStringProperty("service_ping_class");
+ this.meta_ping_timeout = job_props.getStringProperty("service_ping_timeout");
+ this.do_log = job_props.getBooleanProperty("service_ping_dolog", true);
+ this.classpath = job_props.getStringProperty("service_ping_classpath");
+ this.working_directory = job_props.getStringProperty("working_directory");
+ this.log_directory = job_props.getStringProperty("log_directory");
+
+ jvm_args_str = jvm_args_str + " -Dducc.sm.meta.ping.timeout=" + meta_ping_timeout;
+ jvm_args_str = jvm_args_str.trim();
+ jvm_args = jvm_args_str.split("\\s+");
+
+ this.meta_ping_rate = ServiceManagerComponent.meta_ping_rate;
+ this.meta_ping_stability = ServiceManagerComponent.meta_ping_stability;
+ }
+
+ /**
+ * Test from main only
+ */
+ PingDriver(String props)
+ {
+ DuccProperties dp = new DuccProperties();
+ try {
+ dp.load(props);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ this.endpoint = dp.getStringProperty("endpoint");
+ String jvm_args_str = dp.getStringProperty("service_ping_jvm_args", "");
+ this.ping_class = dp.getStringProperty("service_ping_ping");
+ this.classpath = dp.getStringProperty("service_ping_classpath");
+ jvm_args = jvm_args_str.split(" ");
+ this.test_mode = true;
+ }
+
+ public ServiceStatistics getServiceStatistics()
+ {
+ return service_statistics;
+ }
+
+ public void run()
+ {
+ String methodName = "run";
+ try {
+ pinger = new PingThread();
+ } catch ( Throwable t ) {
+ logger.error(methodName, sset.getId(), "Cannot start listen socket, pinger not started.", t);
+ sset.setUnresponsive();
+ return;
+ }
+ int port = pinger.getPort();
+
+ Thread ping_thread = new Thread(pinger);
+ ping_thread.start(); // sets up the listener, before we start the the external process
+
+ ArrayList<String> arglist = new ArrayList<String>();
+ if ( ! test_mode ) {
+ arglist.add(System.getProperty("ducc.agent.launcher.ducc_spawn_path"));
+ arglist.add("-u");
+ arglist.add(user);
+ arglist.add("-w");
+ arglist.add(working_directory);
+ if ( do_log ) {
+ arglist.add("-f");
+ arglist.add(log_directory + "/services/ping/" + sset.getId());
+ }
+ arglist.add("--");
+ }
+
+ arglist.add(System.getProperty("ducc.jvm"));
+ for ( String s : jvm_args) {
+ arglist.add(s);
+ }
+ arglist.add("-cp");
+ arglist.add(System.getProperty("java.class.path") + ":" + classpath);
+ arglist.add("org.apache.uima.ducc.sm.ServicePingMain");
+ arglist.add("--class");
+ arglist.add(ping_class);
+ arglist.add("--endpoint");
+ arglist.add(endpoint);
+ arglist.add("--port");
+ arglist.add(Integer.toString(port));
+
+ int i = 0;
+ for ( String s : arglist) {
+ logger.debug(methodName, sset.getId(), "Args[", i++,"]: ", s);
+ }
+
+ ProcessBuilder pb = new ProcessBuilder(arglist);
+
+ //
+ // Establish our pinger
+ //
+ InputStream stdout = null;
+ InputStream stderr = null;
+ try {
+ ping_main = pb.start();
+ stdout = ping_main.getInputStream();
+ stderr = ping_main.getErrorStream();
+
+ sin_listener = new StdioListener(1, stdout);
+ ser_listener = new StdioListener(2, stderr);
+ Thread sol = new Thread(sin_listener);
+ Thread sel = new Thread(ser_listener);
+ sol.start();
+ sel.start();
+ } catch (Throwable t) {
+ logger.error(methodName, sset.getId(), "Cannot establish ping process:", t);
+ sset.setUnresponsive();
+ return;
+ }
+
+ int rc;
+ while ( true ) {
+ try {
+ rc = ping_main.waitFor();
+ logger.debug(methodName, sset.getId(), "Pinger returns rc ", rc);
+ sset.pingExited();
+ break;
+ } catch (InterruptedException e2) {
+ // nothing
+ }
+ }
+
+ pinger.stop();
+ sin_listener.stop();
+ ser_listener.stop();
+ }
+
+ public void stop()
+ {
+ pinger.stop();
+ sin_listener.stop();
+ ser_listener.stop();
+ ping_main.destroy();
+ }
+
+ class PingThread
+ implements Runnable
+ {
+ ServerSocket server;
+ int port = -1;
+ boolean done = false;
+ int errors =0;
+ int error_threshold = 5;
+
+ PingThread()
+ throws IOException
+ {
+ this.server = new ServerSocket(0);
+ this.port = server.getLocalPort();
+ }
+
+ int getPort()
+ {
+ return this.port;
+ }
+
+ synchronized void stop()
+ {
+ this.done = true;
+ }
+
+ public void run()
+ {
+ String methodName = "PingThread.run()";
+ try {
+
+ Socket sock = server.accept();
+ // Socket sock = new Socket("localhost", port);
+ sock.setSoTimeout(5000);
+ OutputStream out = sock.getOutputStream();
+ InputStream in = sock.getInputStream();
+ ObjectInputStream ois = new ObjectInputStream(in);
+
+ ping_ok = false; // we expect the callback to change this
+ while ( true ) {
+ synchronized(this) {
+ if ( done ) return;
+ }
+
+ if ( errors > error_threshold ) {
+ stop();
+ }
+
+ // Ask for the ping
+ try {
+ logger.trace(methodName, sset.getId(), "PingDriver: ping OUT");
+ out.write('P');
+ out.flush();
+ } catch (IOException e1) {
+ logger.error(methodName, sset.getId(), e1);
+ errors++;
+ }
+
+ // Wait a bit
+ try {
+ Thread.sleep(meta_ping_rate);
+ } catch (InterruptedException e) {
+ // nothing
+ }
+
+ // Try to read the response
+ // TODO: set the socket timeout on this
+ service_statistics = (ServiceStatistics) ois.readObject();
+ if ( service_statistics == null ) {
+ logger.error(methodName, sset.getId(), "Stats are null!");
+ errors++;
+ } else {
+ if ( service_statistics.isAlive() ) {
+ synchronized(this) {
+ if ( done ) return;
+ sset.setResponsive();
+ }
+ logger.info(methodName, sset.getId(), "Ping ok: ", endpoint, service_statistics.toString());
+ missed_pings = 0;
+ } else {
+ logger.error(methodName, sset.getId(), "Missed_pings ", missed_pings, "endpoint", endpoint);
+ if ( ++missed_pings > meta_ping_stability ) {
+ sset.setUnresponsive();
+ logger.info(methodName, sset.getId(), "Seting state to unresponsive, endpoint",endpoint);
+ } else {
+ sset.setWaiting();
+ logger.info(methodName, sset.getId(), "Seting state to waiting, endpoint,", endpoint);
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ logger.error(methodName, sset.getId(), "Error receiving ping", e);
+ errors++;
+ } catch (ClassNotFoundException e) {
+ logger.error(methodName, sset.getId(), "Input garbled:", e);
+ errors++;
+ }
+ }
+ }
+
+ class StdioListener
+ implements Runnable
+ {
+ InputStream in;
+ String tag;
+ boolean done = false;
+
+ StdioListener(int which, InputStream in)
+ {
+ this.in = in;
+ switch ( which ) {
+ case 1: tag = "STDOUT: "; break;
+ case 2: tag = "STDERR: "; break;
+ }
+ }
+
+ void stop()
+ {
+ this.done = true;
+ }
+
+ public void run()
+ {
+ if ( done ) return;
+ String methodName = "StdioListener.run";
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ while ( true ) {
+ try {
+ String s = br.readLine();
+ if ( test_mode ) System.out.println(tag + s);
+ else logger.info(methodName, sset.getId(), tag, s);
+
+ if ( s == null ) {
+ String msg = tag + "closed, listener returns";
+ if ( test_mode ) System.out.println(msg);
+ else logger.info(methodName, sset.getId(), msg);
+ return;
+ }
+
+ } catch (IOException e) {
+ // if anything goes wrong this guy is toast.
+ if ( test_mode) e.printStackTrace();
+ else logger.error(methodName, sset.getId(), e);
+ return;
+ }
+ }
+
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ // arg0 = amqurl = put into -Dbroker.url
+ // arg1 = endpoint - pass to ServicePingMain
+ // call ServicePingMain --class org.apache.uima.ducc.sm.PingTester --endpoint FixedSleepAE_1
+ // make sure test.jar is in the classpath
+ PingDriver csm = new PingDriver(args[0]);
+ csm.run();
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1433548&r1=1433547&r2=1433548&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java Tue Jan 15 17:37:20 2013
@@ -132,6 +132,7 @@ public class ServiceHandler
modifiedServices.clear();
}
handleModifiedServices(incoming);
+ handleImplicitServices();
incoming = new HashMap<DuccId, IDuccWork>();
synchronized(newServices) {
@@ -202,6 +203,7 @@ public class ServiceHandler
*/
protected void resolveState(DuccId id, ServiceDependency dep)
{
+ String methodName = "resolveState";
Map<String, ServiceSet> services = serviceStateHandler.getServicesForJob(id);
if ( services == null ) {
dep.setState(ServiceState.NotAvailable); // says that nothing i need is available
@@ -218,6 +220,7 @@ public class ServiceHandler
for ( ServiceSet sset : services.values() ) {
if ( sset.getServiceState().ordinality() < state.ordinality() ) state = sset.getServiceState();
dep.setIndividualState(sset.getKey(), sset.getServiceState());
+ logger.debug(methodName, id, "Set individual state", sset.getServiceState());
}
dep.setState(state);
}
@@ -266,7 +269,7 @@ public class ServiceHandler
// we need to bypass any attempt to cope with registered services or updating the sset.
//
if ( ! fatal ) {
- if ( sset.isRegistered() && (sset.countImplementors() == 0) ) {
+ if ( sset.isRegistered() && (sset.countImplementors() == 0) && sset.isStartable() ) {
// Registered but not alive, well, we can fix that!
int ninstances = sset.getNInstances();
logger.debug(methodName, sset.getId(), "Reference-starting registered service, instances =", ninstances);
@@ -419,6 +422,7 @@ public class ServiceHandler
if ( j.isFinished() ) {
stopDependentServices(id);
s.setState(ServiceState.NotAvailable);
+ s.clearMessages();
} else if ( j.isActive() ) {
resolveState(id, s);
}
@@ -572,6 +576,22 @@ public class ServiceHandler
// from the published map
}
+ /**
+ * The pinger may have died while we weren't looking. Registered services take care
+ * of themselves from handleModifiedServices, but we know very little about implicit
+ * services so we walk them all and make their ServiceSet keep them clean.
+ */
+ protected void handleImplicitServices()
+ {
+ ArrayList<String> keys = serviceStateHandler.getServiceNames();
+ for ( String k : keys ) {
+ ServiceSet sset = serviceStateHandler.getServiceByName(k);
+ if ( sset.isImplicit() ) {
+ sset.establish();
+ }
+ }
+ }
+
protected void handleModifiedServices(HashMap<DuccId, IDuccWork> work)
{
String methodName = "handleModifiedServices";
@@ -767,7 +787,11 @@ public class ServiceHandler
}
for ( int i = 0; i < wanted; i++ ) {
- sset.start();
+ if ( sset.isStartable() ) {
+ sset.start();
+ } else {
+ sset.establish(); // this will just start the ping thread
+ }
}
@@ -918,7 +942,7 @@ public class ServiceHandler
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
- if ( instances >= 0 ) {
+ if ( instances > 0 ) {
sset.setNInstances(instances); // also persists instances
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java?rev=1433548&r1=1433547&r2=1433548&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java Tue Jan 15 17:37:20 2013
@@ -31,13 +31,13 @@ import org.apache.uima.ducc.common.boot.
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.main.DuccService;
import org.apache.uima.ducc.common.utils.DuccCollectionUtils;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.MissingPropertyException;
import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
import org.apache.uima.ducc.common.utils.Version;
-import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference;
-import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
@@ -52,9 +52,10 @@ import org.apache.uima.ducc.transport.ev
import org.apache.uima.ducc.transport.event.SmStateDuccEvent;
import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
-import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
import org.apache.uima.ducc.transport.event.sm.ServiceMap;
@@ -84,6 +85,7 @@ public class ServiceManagerComponent
static int meta_ping_rate = 60000; // interval in ms to ping the service
static int meta_ping_stability = 5; // number of missed pings before we mark the service down
static int meta_ping_timeout = 500; // timeout on ping
+ static String default_ping_class;
private String state_dir = null;
private String state_file = null;
@@ -195,6 +197,7 @@ public class ServiceManagerComponent
meta_ping_rate = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.rate", meta_ping_rate);
meta_ping_timeout = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.timeout", meta_ping_timeout);
meta_ping_stability = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.stability", meta_ping_stability);
+ default_ping_class = SystemPropertyResolver.getStringProperty("ducc.sm.default.uima-as.ping.class", UimaAsPing.class.getName());
logger.info(methodName, null, "------------------------------------------------------------------------------------");
logger.info(methodName, null, "Service Manager starting:");
@@ -306,7 +309,20 @@ public class ServiceManagerComponent
case Service:
localMap.addDuccWork(w);
- newServices.put(w.getDuccId(), w);
+ // An arbitrary process is **almost** the same as a service in terms of how most of DUCC
+ // handles it. In order to transparently reuse all that code it is classified as a
+ // special type of service, "other", which the SM treats as a regular job.
+ switch ( ((IDuccWorkService)w).getServiceDeploymentType() )
+ {
+ case uima:
+ case custom:
+ newServices.put(w.getDuccId(), w);
+ break;
+ case other:
+ newJobs.put(w.getDuccId(), w);
+ break;
+ }
+
break;
default:
@@ -328,7 +344,16 @@ public class ServiceManagerComponent
case Service:
localMap.removeDuccWork(w.getDuccId());
- deletedServices.put(w.getDuccId(), w);
+ switch ( ((IDuccWorkService)w).getServiceDeploymentType() )
+ {
+ case uima:
+ case custom:
+ deletedServices.put(w.getDuccId(), w);
+ break;
+ case other:
+ deletedJobs.put(w.getDuccId(), w);
+ break;
+ }
break;
default:
@@ -354,8 +379,17 @@ public class ServiceManagerComponent
break;
case Service:
- modifiedServices.put(l.getDuccId(), l);
localMap.addDuccWork(l);
+ switch ( ((IDuccWorkService)l).getServiceDeploymentType() )
+ {
+ case uima:
+ case custom:
+ modifiedServices.put(l.getDuccId(), l);
+ break;
+ case other:
+ modifiedJobs.put(l.getDuccId(), l);
+ break;
+ }
break;
default:
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java?rev=1433548&r1=1433547&r2=1433548&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java Tue Jan 15 17:37:20 2013
@@ -31,7 +31,7 @@ import org.apache.commons.cli.OptionBuil
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
-import org.apache.uima.ducc.cli.AServicePing;
+import org.apache.uima.ducc.common.AServicePing;
import org.apache.uima.ducc.common.ServiceStatistics;
@@ -46,6 +46,11 @@ import org.apache.uima.ducc.common.Servi
public class ServicePingMain
implements SmConstants
{
+
+ boolean debug = false;
+ int error_max = 10;
+ int error_count = 0;
+
public ServicePingMain()
{
@@ -126,16 +131,22 @@ public class ServicePingMain
Class cls = Class.forName(cl);
pinger = (AServicePing) cls.newInstance();
pinger.init(ep);
- } catch (ClassNotFoundException e) {
- print(e);
- } catch (InstantiationException e) {
- print(e);
- } catch (IllegalAccessException e) {
- print(e);
- }
+ } catch (Exception e) {
+ //print(e); // To the logs
+ e.printStackTrace();
+ }
return pinger;
}
+ void handleError(Throwable t)
+ {
+ t.printStackTrace();
+ if ( ++error_count >= error_max ) {
+ System.out.println("Exceeded error count. Exiting.");
+ System.exit(1);
+ }
+ }
+
//
// 1. Instantiate the pinger if possible.
// 2. Read ducc.proeprties to find the ping interval
@@ -153,7 +164,7 @@ public class ServicePingMain
CommandLineParser parser = new PosixParser();
CommandLine commandLine = null;
-
+ ServiceStatistics default_statistics = new ServiceStatistics(false, false, "<N/A>");
try {
commandLine = parser.parse(options, args);
@@ -190,7 +201,6 @@ public class ServicePingMain
return;
}
- ServiceStatistics defaultStatistics = new ServiceStatistics();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(sock_out);
@@ -207,7 +217,7 @@ public class ServicePingMain
}
while ( true ) {
- print("ServicePingMeta starts ping.");
+ if ( debug ) print("ServicePingMeta starts ping.");
byte[] cmd = new byte[1];
cmd[0] = 0;
@@ -215,10 +225,9 @@ public class ServicePingMain
try {
eof = sock_in.read(cmd);
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ handleError(e);
}
- print("Read cmd", new String(cmd), "eof", eof);
+ if ( debug ) print("Read cmd", new String(cmd), "eof", eof);
if ( eof == -1 ) {
print("EOF on input pipe. Exiting");
@@ -228,13 +237,11 @@ public class ServicePingMain
try {
if ( cmd[0] == 'P' ) {
- boolean p = custom.ping();
ServiceStatistics ss = custom.getStatistics();
if ( ss == null ) {
- ss = defaultStatistics;
+ ss = default_statistics;
}
- ss.setPing(p);
- print("Ping is set to " + ss.getPing());
+ // print("Is alive: " + ss.isAlive());
oos.writeObject(ss);
oos.flush();
@@ -251,8 +258,7 @@ public class ServicePingMain
System.err.println("Invalid command recieved: " + Byte.toString(cmd[0]));
}
} catch (Throwable e) {
- // TODO Auto-generated catch block
- e.printStackTrace(); // this is defined to go to stderr
+ handleError(e);
}
}
}
@@ -262,5 +268,5 @@ public class ServicePingMain
ServicePingMain wrapper = new ServicePingMain();
wrapper.start(args);
}
-
+
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1433548&r1=1433547&r2=1433548&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Tue Jan 15 17:37:20 2013
@@ -26,8 +26,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
-import java.net.URLConnection;
-import java.net.URLStreamHandler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -110,12 +108,6 @@ public class ServiceSet
ServiceClass service_class = ServiceClass.Undefined;
ServiceState service_state = ServiceState.Undefined;;
- String custom_classpath;
- String custom_meta_class;
- String[] custom_meta_parms;
-
- long meta_ping_timeout = 500;
-
// structures to manage service linger after it exits
Timer timer = null;
LingerTask linger = null;
@@ -132,6 +124,22 @@ public class ServiceSet
this.service_type = ServiceType.UimaAs;
this.service_class = ServiceClass.Implicit;
+
+ // need job props and meta props so pinger works
+ // job props: , service_ping_class, service_ping_classpath, working_directory, log_directory
+ // meta props: endpoint, user
+ job_props = new DuccProperties();
+ job_props.put("service_ping_class", ServiceManagerComponent.default_ping_class);
+ job_props.put("service_ping_classpath", System.getProperty("java.class.path"));
+ job_props.put("service_ping_dolog", "false");
+ job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
+ job_props.put("working_directory", System.getProperty("user.dir")); // whatever my current dir is
+ job_props.put("log_directory", System.getProperty("user.dir") + "/../logs");
+ job_props.put("service_ping_jvm_args", "-Xmx50M");
+
+ meta_props = new DuccProperties();
+ meta_props.put("user", System.getProperty("user.name"));
+ meta_props.put("endpoint", key);
}
//
@@ -139,12 +147,16 @@ public class ServiceSet
//
public ServiceSet(DuccId id, String key, String[] independentServices)
{
+ // deprecating for now
+ throw new IllegalStateException("Submitted services not supported");
+ /**
this.key = key;
this.implementors.put(id, JobState.Undefined);
this.independentServices = independentServices;
this.service_class = ServiceClass.Submitted;
parseEndpoint(key);
+ */
}
//
@@ -179,6 +191,25 @@ public class ServiceSet
parseIndependentServices();
+ if ( service_type == ServiceType.UimaAs ) {
+ if ( ! job_props.containsKey("service_ping_class" ) ) {
+ job_props.put("service_ping_class", ServiceManagerComponent.default_ping_class);
+ job_props.put("service_ping_classpath", System.getProperty("java.class.path"));
+ // this lets us turn on debug logging for the default pinger
+ if ( ! job_props.containsKey("service_ping_dolog")) {
+ job_props.put("service_ping_dolog", "false");
+ }
+ job_props.put("service_ping_jvm_args", "-Xmx50M");
+ }
+ }
+
+ if ( !job_props.containsKey("service_ping_timeout") ) {
+ job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
+ }
+ if ( ! job_props.containsKey("service_ping_dolog") ) {
+ job_props.put("service_ping_dolog", "true"); // unless we fill in the default pinger
+ }
+
//UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
//UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
// there are a couple junky messages that slip by the above configurations. turn the whole danged thing off.
@@ -293,6 +324,21 @@ public class ServiceSet
}
/**
+ * A service is startable if it's UIMA-AS, or if it's CUSTOM and has a process_executable
+ * associated with it. A non-startable CUSTOM service may have only a pinger.
+ */
+ boolean isStartable()
+ {
+ switch ( service_type ) {
+ case UimaAs:
+ return true;
+ case Custom:
+ return job_props.containsKey("process_executable");
+ }
+ return false; // redundant but needed for compilation
+ }
+
+ /**
* At boot only ... synchronize my state with published OR state.
*/
void synchronizeImplementors(Map<DuccId, JobState> work)
@@ -529,10 +575,13 @@ public class ServiceSet
}
// stop the pinger if no longer needed
- if ( (references.size() == 0) && (isImplicit() || isCustom()) ) { // non-implicit are stopped when the implementors go away
- stopPingThread();
+ if ( (references.size() == 0) && // nothing left
+ ( isImplicit() || ( isCustom() && !isStartable()) ) // implicit UIMA-AS || implicit CUSTOM
+ )
+ {
+ stopPingThread(); // must stop pinger because there's no state machine
+ // to do it for us in this situation
}
- // If it's registered, and nrefs == 0, and implicit_start is true, we need to stop the service
return references.size();
}
@@ -611,7 +660,6 @@ public class ServiceSet
}
} else {
if ( service_type == ServiceType.Custom ) {
- // ugly, until we're actually starting the custom process as well
startPingThread();
} else {
int needed = Math.max(0, instances - friendly_ids.size());
@@ -784,18 +832,10 @@ public class ServiceSet
if ( serviceMeta != null ) return; // don't start multiple times.
try {
- switch ( service_type ) {
- case UimaAs:
- logger.info(methodName, id, "Starting UIMA-AS monitor.");
- serviceMeta = new UimaServiceMeta(this, endpoint, broker_host, broker_jmx_port);
- break;
- case Custom:
- logger.info(methodName, id, "Starting CUSTOM monitor.");
- serviceMeta = new CustomServiceMeta(this);
- break;
- }
+ logger.info(methodName, id, "Starting ping/monitor.");
+ serviceMeta = new PingDriver(this);
} catch ( Throwable t ) {
- logger.error(methodName, id, "Cannot start CUSTOM monitor.", t);
+ logger.error(methodName, id, "Cannot instantiate ping/monitor.", t);
return;
}
@@ -804,6 +844,18 @@ public class ServiceSet
t.start();
}
+ synchronized void pingExited()
+ {
+ String methodName = "pingExited";
+ if ( serviceMeta != null ) {
+ logger.warn(methodName, id, "Pinger exited voluntarily, setting state to Undefined. Endpoint", endpoint);
+ service_state = ServiceState.Undefined; // not really sure what state is. it will be
+ // checked and updated next run through the
+ // main state machine, and maybe ping restarted.
+ serviceMeta = null;
+ }
+
+ }
public synchronized void stopPingThread()
{
@@ -877,10 +929,10 @@ public class ServiceSet
{
String methodName = "start";
- if ( service_type == ServiceType.Custom ) {
- establish();
- return;
- }
+// if ( service_type == ServiceType.Custom ) {
+// establish();
+// return;
+// }
this.stopped = false; // for registered
@@ -894,7 +946,7 @@ public class ServiceSet
System.getProperty("ducc.jvm"),
"-cp",
System.getProperty("java.class.path"),
- "org.apache.uima.ducc.sm.cli.DuccServiceSubmit",
+ "org.apache.uima.ducc.cli.DuccServiceSubmit",
"--specification",
props_filename
};
@@ -989,7 +1041,7 @@ public class ServiceSet
System.getProperty("ducc.jvm"),
"-cp",
System.getProperty("java.class.path"),
- "org.apache.uima.ducc.sm.cli.DuccServiceCancel",
+ "org.apache.uima.ducc.cli.DuccServiceCancel",
"--id",
id.toString()
};
@@ -1151,17 +1203,6 @@ public class ServiceSet
return sd;
}
- class TcpStreamHandler
- extends URLStreamHandler
- {
- TcpStreamHandler() {}
-
- public URLConnection openConnection(URL u)
- {
- //throw new Exception("This protocol handler isn't expected to actually work.");
- return null;
- }
- }
/**
* For debugging, so it's easier to identify this guy in the eclipse debugger.
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java?rev=1433548&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java Tue Jan 15 17:37:20 2013
@@ -0,0 +1,18 @@
+package org.apache.uima.ducc.sm;
+
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+
+
+class TcpStreamHandler
+ extends URLStreamHandler
+{
+ TcpStreamHandler() {}
+
+ public URLConnection openConnection(URL u)
+ {
+ //throw new Exception("This protocol handler isn't expected to actually work.");
+ return null;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java?rev=1433548&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java Tue Jan 15 17:37:20 2013
@@ -0,0 +1,124 @@
+package org.apache.uima.ducc.sm;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.ducc.common.AServicePing;
+import org.apache.uima.ducc.common.ServiceStatistics;
+import org.apache.uima.ducc.common.UimaAsServiceMonitor;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.util.Level;
+
+public class UimaAsPing
+ extends AServicePing
+{
+
+ String ep;
+
+ String endpoint;
+ String broker;
+ int meta_timeout;
+
+ String broker_host;
+ int broker_jmx_port;
+ boolean connected;
+ UimaAsServiceMonitor monitor;
+
+ public void init(String ep)
+ throws Exception
+ {
+ this.ep = ep;
+
+ // Ep is of the form UIMA-AS:queuename:broker
+ int ndx = ep.indexOf(":");
+ ep = ep.substring(ndx+1);
+ ndx = ep.indexOf(":");
+
+ this.endpoint = ep.substring(0, ndx).trim();
+ this.broker = ep.substring(ndx+1).trim();
+
+ // broker is a URL that we need to parse in order to get the actual host and port
+ // for jmx
+ URL url = null;
+ try {
+ url = new URL(null, broker, new TcpStreamHandler());
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Invalid broker URL: " + broker);
+ }
+ broker_host = url.getHost();
+ // not needed here fyi broker_port = url.getPort();
+
+ String to = System.getProperty("ducc.sm.meta.ping.timeout");
+ meta_timeout = Integer.parseInt(to);
+
+
+ broker_jmx_port = SystemPropertyResolver.getIntProperty("ducc.sm.meta.jmx.port", 1099);
+ this.monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
+ init_monitor();
+
+ //UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
+ //UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
+ // there are a couple junky messages that slip by the above configurations. turn the whole danged thing off.
+ UIMAFramework.getLogger().setLevel(Level.OFF);
+
+ }
+
+ public void stop()
+ {
+ }
+
+ private synchronized void init_monitor()
+ {
+ if ( ! connected ) {
+ try {
+ System.out.println("Initializing monitor");
+ monitor.init(ep);
+ connected = true;
+ System.out.println("Monitor initialized");
+ } catch (Throwable t ) {
+ connected = false;
+ t.printStackTrace();
+ }
+ }
+ }
+
+ public ServiceStatistics getStatistics()
+ {
+ ServiceStatistics statistics = new ServiceStatistics(false, false, "<NA>");
+
+ // Instantiate Uima AS Client
+ BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+ Map<String, Object> appCtx = new HashMap<String, Object>();
+ appCtx.put(UimaAsynchronousEngine.ServerUri, broker);
+ appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
+ appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, meta_timeout);
+
+ try {
+ // this sends GetMeta request and blocks waiting for a reply
+ init_monitor();
+ if ( connected ) {
+ statistics = monitor.getStatistics();
+ }
+
+ uimaAsEngine.initialize(appCtx);
+ statistics.setAlive(true);
+ statistics.setHealthy(true);
+ // System.out.println("getMeta ok: " + ep);
+
+ } catch( ResourceInitializationException e) {
+ System.out.println("Cannot issue getMeta: " + e.getMessage());
+ e.printStackTrace();
+ } finally {
+ uimaAsEngine.stop();
+ }
+
+ return statistics;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java
------------------------------------------------------------------------------
svn:eol-style = native