You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2013/02/06 20:35:53 UTC
svn commit: r1443158 -
/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java
Author: degenaro
Date: Wed Feb 6 19:35:53 2013
New Revision: 1443158
URL: http://svn.apache.org/viewvc?rev=1443158&view=rev
Log:
UIMA-2642 Replace Camel listening mechanism with WS polling mechanism for Job status monitoring
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java?rev=1443158&r1=1443157&r2=1443158&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java Wed Feb 6 19:35:53 2013
@@ -19,22 +19,17 @@
package org.apache.uima.ducc.cli;
import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.camel.component.ActiveMQComponent;
-import org.apache.camel.Body;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -43,20 +38,17 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.uima.ducc.api.DuccMessage;
import org.apache.uima.ducc.api.IDuccMessageProcessor;
+import org.apache.uima.ducc.common.json.MonitorInfo;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
-import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
-import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
-import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
-import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
-import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
-import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+import com.google.gson.Gson;
/**
* Monitor a DUCC job
*/
-public class DuccJobMonitor extends DuccUi implements DuccEventDelegateListener {
+public class DuccJobMonitor extends DuccUi {
+private Thread main = null;
private static final int RC_UNKNOWN = -1;
private static final int RC_SUCCESS = 0;
@@ -66,44 +58,34 @@ public class DuccJobMonitor extends Ducc
private static final String NotFound = "NotFound";
- private AtomicBoolean stopped = new AtomicBoolean(false);
- private AtomicBoolean jobActive = new AtomicBoolean(true);
- private AtomicInteger rc = new AtomicInteger(RC_UNKNOWN);
-
- private String lastMessage = "";
+ private static final String StateRunning = "Running";
+ private static final String StateCompleting = "Completing";
+ private static final String StateCompleted = "Completed";
- private Thread main = null;
-
- private CamelContext context;
- private ActiveMQComponent amqc;
+ private AtomicInteger rc = new AtomicInteger(RC_UNKNOWN);
+ private AtomicBoolean cancel_job_on_interrupt = new AtomicBoolean(false);
- private String broker = DuccUiUtilities.buildBrokerUrl();
- private String endpoint = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_jms_provider)
- + ":"
- + DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_state_update_endpoint_type)
- + ":"
- + DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_state_update_endpoint)
- ;
- private String jmsProvider = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_jms_provider);
-
private String jobId = null;
- private DuccWorkJob job = null;
private boolean info = true;
+ private boolean error = true;
private boolean debug = false;
- private boolean cancel_job_on_interrupt = false;
- private int milliseconds = 1;
- private int seconds = 1000*milliseconds;
- private int wakeupTime = 60*seconds;
-
- private int MAXLINES = 2000;
+ private boolean timestamp = false;
+
+ private int milliseconds = 1;
+ private int seconds = 1000*milliseconds;
+ private int wakeupInterval = 15*seconds;
+
+ private int urlTimeout = 60*seconds;
private IDuccMessageProcessor duccMessageProcessor = new DuccMessage();
+ DuccPropertiesResolver duccPropertiesResolver;
+
private void debug(String message) {
if(debug) {
- duccMessageProcessor.out(message);
+ duccMessageProcessor.out(timestamp(message));
}
}
@@ -115,10 +97,25 @@ public class DuccJobMonitor extends Ducc
private void info(String message) {
if(info) {
- duccMessageProcessor.out(message);
+ duccMessageProcessor.out(timestamp(message));
}
}
+ private void error(String message) {
+ if(error) {
+ duccMessageProcessor.out(timestamp(message));
+ }
+ }
+
+ private String timestamp(String message) {
+ String tMessage = message;
+ if(timestamp) {
+ String date = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date());
+ tMessage = date+" "+message;
+ }
+ return tMessage;
+ }
+
public DuccJobMonitor() {
}
@@ -154,153 +151,87 @@ public class DuccJobMonitor extends Ducc
.withLongOpt(DuccUiConstants.name_service_endpoint).create());
}
- private void start() {
- context = new DefaultCamelContext();
- amqc = ActiveMQComponent.activeMQComponent(broker);
- context.addComponent(jmsProvider, amqc);
- try {
- context.addRoutes(this.routeBuilderForIncomingRequests(endpoint, this));
- } catch (Exception e) {
- duccMessageProcessor.exception(e);
- }
- try {
- context.start();
- } catch (Exception e) {
- duccMessageProcessor.exception(e);
- }
+ protected void help(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setWidth(DuccUiConstants.help_width);
+ formatter.printHelp(DuccJobMonitor.class.getName(), options);
+ rc.set(RC_HELP);
+ return;
}
- private void stop() {
- while(!stopped.get()) {
- boolean success = stopped.compareAndSet(false, true);
- if(success) {
- try {
- CamelUtil.stop(context);
- } catch (Exception e) {
- duccMessageProcessor.exception(e);
- }
- }
+ private String getSingleLineStatus(String urlString) {
+ String line = null;
+ URL url = null;
+ try {
+ url = new URL(urlString);
+ URLConnection uc = url.openConnection();
+ uc.setReadTimeout(urlTimeout);
+ BufferedReader br = new BufferedReader(new InputStreamReader(uc.getInputStream()));
+ line = br.readLine();
+ br.close();
+ }
+ catch (MalformedURLException e) {
+ e.printStackTrace();
}
+ catch(IOException e) {
+ e.printStackTrace();
+ }
+ return line;
}
- public synchronized RouteBuilder routeBuilderForIncomingRequests(final String endpoint, final DuccJobMonitor delegate) {
- return new RouteBuilder() {
- public void configure() {
- //System.out.println("..... Defining Router on endpoint:"+endpoint);
- from(endpoint)
- //.unmarshal().xstream()
- .process(new MonitorProcessor())
- .bean(delegate);
- }
- };
- }
-
- public class MonitorProcessor implements Processor {
- public void process( Exchange ex ) {
- //System.out.println("..... Monitor received an event ....");
+ private String getUrlString(String id) {
+ String host = duccPropertiesResolver.getFileProperty("ducc.ws.node");
+ if(host == null) {
+ host = duccPropertiesResolver.getFileProperty("ducc.head");
}
+ String port = duccPropertiesResolver.getFileProperty("ducc.ws.port");
+ String urlString = "http://"+host+":"+port+"/ducc-servlet/proxy-job-status?id="+id;
+ debug(urlString);
+ return urlString;
}
- boolean isWorkCompleted(String v1, String v2) {
- boolean retVal = false;
+ private void adjustWakeupInterval() {
+ String rate = duccPropertiesResolver.getFileProperty("ducc.orchestrator.state.publish.rate");
try {
- int intValue1 = Integer.parseInt(v1);
- int intValue2 = Integer.parseInt(v2);
- if(intValue1 >= 0) {
- if(intValue1 == intValue2) {
- retVal = true;
- }
- }
+ wakeupInterval = Integer.parseInt(rate);
}
catch(Exception e) {
- }
- return retVal;
- }
-
- public void onOrchestratorStateDuccEvent(@Body OrchestratorStateDuccEvent duccEvent) throws Exception {
- synchronized(jobActive) {
- if(jobActive.get()) {
- debug("JobCount:"+duccEvent.getWorkMap().getJobCount());
- job = (DuccWorkJob) duccEvent.getWorkMap().findDuccWork(DuccType.Job, jobId);
- if(job == null) {
- StringBuffer message = new StringBuffer();
- message.append("id:"+jobId);
- message.append(" ");
- message.append("state:"+NotFound);
- info(message.toString());
- jobActive.set(false);
- }
- else {
- JobState jobState = (JobState) job.getStateObject();
- String total = job.getSchedulingInfo().getWorkItemsTotal();
- String completed = job.getSchedulingInfo().getWorkItemsCompleted();
- String error = job.getSchedulingInfo().getWorkItemsError();
- String retry = job.getSchedulingInfo().getWorkItemsRetry();
- StringBuffer messageBuffer = new StringBuffer();
- String message = messageBuffer.toString();
- messageBuffer.append("id:"+jobId);
- messageBuffer.append(" ");
- messageBuffer.append("state:"+jobState);
- switch(jobState) {
- case Completed:
- int count = job.getProcessMap().getAliveProcessCount();
- if(count > 0) {
- messageBuffer.append(" ");
- messageBuffer.append("processes stopping:"+count);
- }
- else {
- jobActive.set(false);
- if(isWorkCompleted(total,completed)) {
- rc.set(RC_SUCCESS);
- }
- else {
- rc.set(RC_FAILURE);
- }
- }
- case Running:
- case Completing:
- messageBuffer.append(" ");
- messageBuffer.append("total:"+total);
- messageBuffer.append(" ");
- messageBuffer.append("done:"+completed);
- messageBuffer.append(" ");
- messageBuffer.append("error:"+error);
- messageBuffer.append(" ");
- messageBuffer.append("retry:"+retry);
- break;
- }
- message = messageBuffer.toString();
- synchronized(lastMessage) {
- if(!message.equals(lastMessage)) {
- info(message.toString());
- lastMessage = message;
- }
- }
- }
- if(!jobActive.get()) {
- main.interrupt();
- }
- }
- else {
- debug("OR publication ignored...job not active");
- }
+ debug(e);
}
}
- @Override
- public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) {
- throw new RuntimeException();
- }
-
- protected int help(Options options) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.setWidth(DuccUiConstants.help_width);
- formatter.printHelp(DuccJobMonitor.class.getName(), options);
- return 1;
+ private String details(MonitorInfo monitorInfo) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(" ");
+ sb.append("total:");
+ sb.append(monitorInfo.total);
+ sb.append(" ");
+ sb.append("done:");
+ sb.append(monitorInfo.done);
+ sb.append(" ");
+ sb.append("error:");
+ sb.append(monitorInfo.error);
+ sb.append(" ");
+ sb.append("retry:");
+ sb.append(monitorInfo.retry);
+ sb.append(" ");
+ sb.append("procs:");
+ sb.append(monitorInfo.procs);
+ return sb.toString();
}
public int run(String[] args) throws Exception {
/*
+ * require DUCC_HOME
+ */
+ String ducc_home_key = "DUCC_HOME";
+ String ducc_home = System.getenv(ducc_home_key);
+ if(ducc_home == null) {
+ duccMessageProcessor.err("missing required environment variable: "+ducc_home_key);
+ rc.set(RC_FAILURE);
+ return rc.get();
+ }
+ /*
* parser is not thread safe?
*/
synchronized(DuccUi.class) {
@@ -312,19 +243,12 @@ public class DuccJobMonitor extends Ducc
* give help & exit when requested
*/
if (commandLine.hasOption(DuccUiConstants.name_help)) {
- return help(options);
+ help(options);
+ return rc.get();
}
if(commandLine.getOptions().length == 0) {
- return help(options);
- }
- /*
- * require DUCC_HOME
- */
- String ducc_home_key = "DUCC_HOME";
- String ducc_home = System.getenv(ducc_home_key);
- if(ducc_home == null) {
- duccMessageProcessor.err("missing required environment variable: "+ducc_home_key);
- return 1;
+ help(options);
+ return rc.get();
}
/*
* detect duplicate options
@@ -336,20 +260,19 @@ public class DuccJobMonitor extends Ducc
* timestamp
*/
if (commandLine.hasOption(DuccUiConstants.name_timestamp)) {
- //logger = logger_ts;
+ timestamp = true;
}
/*
* verbosity
*/
if (commandLine.hasOption(DuccUiConstants.name_debug)) {
- //logger.setLevel(Level.DEBUG);
debug = true;
}
/*
* cancel job enabled
*/
if (commandLine.hasOption(DuccUiConstants.name_monitor_cancel_job_on_interrupt)) {
- cancel_job_on_interrupt = true;
+ cancel_job_on_interrupt.set(true);
}
/*
* job id
@@ -364,130 +287,117 @@ public class DuccJobMonitor extends Ducc
rc.set(RC_HELP);
return rc.get();
}
- /*
- * broker & endpoint
- */
- if (commandLine.hasOption(DuccUiConstants.name_service_broker)) {
- broker = commandLine.getOptionValue(DuccUiConstants.name_service_broker);
- }
- if (commandLine.hasOption(DuccUiConstants.name_service_endpoint)) {
- endpoint = commandLine.getOptionValue(DuccUiConstants.name_service_endpoint);
- }
}
- /*
- * echo
- */
- debug("jmsProvider="+jmsProvider);
- debug("broker="+broker);
- debug("endpoint="+endpoint);
- debug("id="+jobId);
main = Thread.currentThread();
Thread killer = new Killer(main);
Runtime.getRuntime().addShutdownHook(killer);
- start();
-
- StringBuffer msgName = new StringBuffer();
- msgName.append("id:"+jobId);
- msgName.append(" ");
- msgName.append("location:");
- msgName.append(ManagementFactory.getRuntimeMXBean().getName());
- info(msgName.toString());
+ duccPropertiesResolver = DuccPropertiesResolver.getInstance();
- debug("monitor start");
- while(jobActive.get()) {
- try {
- Thread.sleep(wakeupTime);
- } catch (InterruptedException e) {
- debug(e);
- }
- debug("monitor active...");
- }
- debug("monitor stop");
- stop();
+ adjustWakeupInterval();
- Runtime.getRuntime().removeShutdownHook(killer);
-
- showErrors();
-
- StringBuffer msgRc = new StringBuffer();
- msgRc.append("id:"+jobId);
- msgRc.append(" ");
- msgRc.append("rc:"+rc.get());
- info(msgRc.toString());
-
- return rc.get();
- }
-
- private void dumpFile(String fileName) {
- try {
- String prefix = "id:"+jobId+" ";
- String data = "file:"+fileName;
- info(prefix+data);
- FileInputStream fis = new FileInputStream(fileName);
- DataInputStream dis = new DataInputStream(fis);
- BufferedReader br = new BufferedReader(new InputStreamReader(dis));
- int maxErrors = 1;
- int maxLines = MAXLINES;
- int lines = 0;
- int errors = 0;
- while(true) {
- String line = br.readLine();
- lines ++;
- if(line == null) {
- break;
+ boolean observer = true;
+ String urlString = getUrlString(jobId);
+ String lastMessage = "";
+ String thisMessage = "";
+
+ StringBuffer message = new StringBuffer();
+
+ message.append("id:"+jobId);
+ message.append(" ");
+ message.append("location:");
+ message.append(ManagementFactory.getRuntimeMXBean().getName());
+ info(message.toString());
+
+ while(observer) {
+
+ String json = getSingleLineStatus(urlString);
+ debug(json);
+
+ if(json != null) {
+ Gson gson = new Gson();
+ MonitorInfo monitorInfo = gson.fromJson(json, MonitorInfo.class);
+
+ int stateCount = monitorInfo.stateSequence.size();
+ debug("states:"+stateCount);
+ if(stateCount <= 0) {
+ message = new StringBuffer();
+ message.append("id:"+jobId);
+ message.append(" ");
+ message.append("state:"+NotFound);
+ thisMessage = message.toString();
+ info(thisMessage);
+ return rc.get();
}
- String tline = line.trim();
- if(tline.length() == 0) {
- continue;
+
+ String state = "";
+ Iterator<String> states = monitorInfo.stateSequence.iterator();
+ while(states.hasNext()) {
+ state = states.next();
}
- String[] tokens = tline.split(" ");
- if(tokens.length > 5) {
- if(tokens[5].trim().equals("ERROR")) {
- errors++;
- }
+
+ message = new StringBuffer();
+ message.append("id:"+jobId);
+ message.append(" ");
+ message.append("state:"+state);
+
+ if(state.equals(StateRunning)) {
+ message.append(details(monitorInfo));
+ }
+ else if(state.equals(StateCompleting)) {
+ cancel_job_on_interrupt.set(false);
+ message.append(details(monitorInfo));
}
- info(prefix+line);
- if(lines > maxLines) {
- info(prefix+"more...");
- break;
+ else if(state.equals(StateCompleted)) {
+ cancel_job_on_interrupt.set(false);
+ message.append(details(monitorInfo));
}
- if(errors > maxErrors) {
- info(prefix+"more...");
- break;
+
+ thisMessage = message.toString();
+ if(!thisMessage.equals(lastMessage)) {
+ info(thisMessage);
+ lastMessage = thisMessage;
+ }
+
+ if(state.equals(StateCompleted)) {
+ if(monitorInfo.procs.equals("0")) {
+ if(monitorInfo.total.equals(monitorInfo.done)) {
+ message = new StringBuffer();
+ message.append("id:"+jobId);
+ message.append(" ");
+ message.append("rc:"+RC_SUCCESS);
+ thisMessage = message.toString();
+ info(thisMessage);
+ rc.set(RC_SUCCESS);
+ return rc.get();
+ }
+ else {
+ message = new StringBuffer();
+ message.append("id:"+jobId);
+ message.append(" ");
+ message.append("rc:"+RC_FAILURE);
+ thisMessage = message.toString();
+ info(thisMessage);
+ rc.set(RC_FAILURE);
+ return rc.get();
+ }
+ }
}
}
- br.close();
- dis.close();
- fis.close();
- }
- catch(Exception e) {
- debug(e);
- }
- }
-
- private void showErrors() {
- if(job != null) {
- StringBuffer sb = new StringBuffer();
- sb.append(job.getLogDirectory());
- if(!job.getLogDirectory().endsWith(File.separator)) {
- sb.append(File.separator);
+ else {
+ error("error: timeout accessing "+urlString);
}
- sb.append(job.getDuccId().getFriendly()+File.separator);
- sb.append("jd.err.log");
+
try {
- String fileName = sb.toString();
- File file = new File(fileName);
- if(file.canRead()) {
- dumpFile(fileName);
- }
- }
- catch(Exception e) {
+ Thread.sleep(wakeupInterval);
+ } catch (InterruptedException e) {
debug(e);
}
}
+
+ return rc.get();
}
private class Killer extends Thread {
@@ -497,7 +407,7 @@ public class DuccJobMonitor extends Ducc
public void run() {
StringBuffer message = new StringBuffer();
- if(cancel_job_on_interrupt) {
+ if(cancel_job_on_interrupt.get()) {
message.append("killer: cancel");
cancel();
}
@@ -513,8 +423,8 @@ public class DuccJobMonitor extends Ducc
ArrayList<String> arrayList = new ArrayList<String>();
arrayList.add("--"+DuccUiConstants.name_job_id);
arrayList.add(jobId);
- arrayList.add("--"+DuccUiConstants.name_service_broker);
- arrayList.add(broker);
+ arrayList.add("--"+DuccUiConstants.name_reason);
+ arrayList.add("\"submitter was terminated via interrupt\"");
String[] argList = arrayList.toArray(new String[0]);
DuccJobCancel duccJobCancel = new DuccJobCancel();
int retVal = duccJobCancel.run(argList);
@@ -526,7 +436,6 @@ public class DuccJobMonitor extends Ducc
duccMessageProcessor.exception(e);
}
}
-
public static void main(String[] args) {
try {
DuccJobMonitor duccJobMonitor = new DuccJobMonitor();