You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2013/02/06 20:35:53 UTC

svn commit: r1443158 - /uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java

Author: degenaro
Date: Wed Feb  6 19:35:53 2013
New Revision: 1443158

URL: http://svn.apache.org/viewvc?rev=1443158&view=rev
Log:
UIMA-2642 Replace Camel listening mechanism with WS polling mechanism for Job status monitoring

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java?rev=1443158&r1=1443157&r2=1443158&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java Wed Feb  6 19:35:53 2013
@@ -19,22 +19,17 @@
 package org.apache.uima.ducc.cli;
 
 import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.camel.component.ActiveMQComponent;
-import org.apache.camel.Body;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -43,20 +38,17 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.uima.ducc.api.DuccMessage;
 import org.apache.uima.ducc.api.IDuccMessageProcessor;
+import org.apache.uima.ducc.common.json.MonitorInfo;
 import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
-import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
-import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
-import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
-import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
-import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
-import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
 
+import com.google.gson.Gson;
 
 /**
  * Monitor a DUCC job
  */
 
-public class DuccJobMonitor extends DuccUi implements DuccEventDelegateListener {
+public class DuccJobMonitor extends DuccUi {
+private Thread main = null;
 	
 	private static final int RC_UNKNOWN = -1;
 	private static final int RC_SUCCESS = 0;
@@ -66,44 +58,34 @@ public class DuccJobMonitor extends Ducc
 	
 	private static final String NotFound = "NotFound";
 	
-	private AtomicBoolean stopped = new AtomicBoolean(false);
-	private AtomicBoolean jobActive = new AtomicBoolean(true);
-	private AtomicInteger rc = new AtomicInteger(RC_UNKNOWN);
-	
-	private String lastMessage = "";
+	private static final String StateRunning 	= "Running";
+	private static final String StateCompleting = "Completing";
+	private static final String StateCompleted 	= "Completed";
 	
-	private Thread main = null;
-	
-	private CamelContext context;
-	private ActiveMQComponent amqc;
+	private AtomicInteger rc = new AtomicInteger(RC_UNKNOWN);
+	private AtomicBoolean cancel_job_on_interrupt = new AtomicBoolean(false);
 	
-	private String broker = DuccUiUtilities.buildBrokerUrl();
-	private String endpoint = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_jms_provider)
-    						+ ":"
-    						+ DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_state_update_endpoint_type)
-    						+ ":"
-    						+ DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_state_update_endpoint)
-    						;
-	private String jmsProvider = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_jms_provider);
-
 	private String jobId = null;
-	private DuccWorkJob job = null;
 	
 	private boolean info = true;
+	private boolean error = true;
 	private boolean debug = false;
-	private boolean cancel_job_on_interrupt = false;
 	
-	private int milliseconds = 1;
-	private int seconds		 = 1000*milliseconds;
-	private int wakeupTime 	 = 60*seconds;
-
-	private int MAXLINES = 2000;
+	private boolean timestamp = false;
+	
+	private int milliseconds 	= 1;
+	private int seconds		 	= 1000*milliseconds;
+	private int wakeupInterval 	= 15*seconds;
+	
+	private int urlTimeout = 60*seconds;
 	
 	private IDuccMessageProcessor duccMessageProcessor = new DuccMessage();
 	
+	DuccPropertiesResolver duccPropertiesResolver;
+
 	private void debug(String message) {
 		if(debug) {
-			duccMessageProcessor.out(message);
+			duccMessageProcessor.out(timestamp(message));
 		}
 	}
 	
@@ -115,10 +97,25 @@ public class DuccJobMonitor extends Ducc
 	
 	private void info(String message) {
 		if(info) {
-			duccMessageProcessor.out(message);
+			duccMessageProcessor.out(timestamp(message));
 		}
 	}
 	
+	private void error(String message) {
+		if(error) {
+			duccMessageProcessor.out(timestamp(message));
+		}
+	}
+	
+	private String timestamp(String message) {
+		String tMessage = message;
+		if(timestamp) {
+			String date = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date());
+			tMessage = date+" "+message;
+		}
+		return tMessage;
+	}
+	
 	public DuccJobMonitor() {
 	}
 	
@@ -154,153 +151,87 @@ public class DuccJobMonitor extends Ducc
 				.withLongOpt(DuccUiConstants.name_service_endpoint).create());
 	}
 	
-	private void start() {
-		context = new DefaultCamelContext();
-		amqc = ActiveMQComponent.activeMQComponent(broker);
-        context.addComponent(jmsProvider, amqc);
-        try {
-			context.addRoutes(this.routeBuilderForIncomingRequests(endpoint, this));
-		} catch (Exception e) {
-			duccMessageProcessor.exception(e);
-		}
-		try {
-			context.start();
-		} catch (Exception e) {
-			duccMessageProcessor.exception(e);
-		}
+	protected void help(Options options) {
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setWidth(DuccUiConstants.help_width);
+		formatter.printHelp(DuccJobMonitor.class.getName(), options);
+		rc.set(RC_HELP);
+		return;
 	}
 	
-	private void stop() {
-		while(!stopped.get()) {
-			boolean success = stopped.compareAndSet(false, true);
-			if(success) {
-				try {
-					CamelUtil.stop(context);
-				} catch (Exception e) {
-					duccMessageProcessor.exception(e);
-				}
-			}
+	private String getSingleLineStatus(String urlString) {
+		String line = null;
+		URL url = null;
+		try {
+		    url = new URL(urlString);
+		    URLConnection uc = url.openConnection();
+		    uc.setReadTimeout(urlTimeout); 
+		    BufferedReader br = new BufferedReader(new InputStreamReader(uc.getInputStream()));
+		    line = br.readLine();
+		    br.close();
+		} 
+		catch (MalformedURLException e) {
+		    e.printStackTrace();
 		}
+		catch(IOException e) {
+			e.printStackTrace();
+		} 
+		return line;
 	}
 	
-	public synchronized RouteBuilder routeBuilderForIncomingRequests(final String endpoint, final DuccJobMonitor delegate) {
-        return new RouteBuilder() {
-            public void configure() {
-            	//System.out.println("..... Defining Router on endpoint:"+endpoint);
-            	from(endpoint)
-            	//.unmarshal().xstream()
-            	.process(new MonitorProcessor())
-            	.bean(delegate);
-            }
-        };
-	}
-	
-	public class MonitorProcessor implements Processor {
-		public void process( Exchange ex ) {
-			//System.out.println("..... Monitor received an event ....");
+	private String getUrlString(String id) {
+		String host = duccPropertiesResolver.getFileProperty("ducc.ws.node");
+		if(host == null) {
+			host = duccPropertiesResolver.getFileProperty("ducc.head");
 		}
+		String port = duccPropertiesResolver.getFileProperty("ducc.ws.port");
+		String urlString = "http://"+host+":"+port+"/ducc-servlet/proxy-job-status?id="+id;
+		debug(urlString);
+		return urlString;
 	}
 	
-	boolean isWorkCompleted(String v1, String v2) {
-		boolean retVal = false;
+	private void adjustWakeupInterval() {
+		String rate = duccPropertiesResolver.getFileProperty("ducc.orchestrator.state.publish.rate");
 		try {
-			int intValue1 = Integer.parseInt(v1);
-			int intValue2 = Integer.parseInt(v2);
-			if(intValue1 >= 0) {
-				if(intValue1 == intValue2) {
-					retVal = true;
-				}
-			}
+			wakeupInterval = Integer.parseInt(rate);
 		}
 		catch(Exception e) {
-		}
-		return retVal;
-	}
-	
-	public void onOrchestratorStateDuccEvent(@Body OrchestratorStateDuccEvent duccEvent) throws Exception {
-		synchronized(jobActive) {
-			if(jobActive.get()) {
-				debug("JobCount:"+duccEvent.getWorkMap().getJobCount());
-				job = (DuccWorkJob) duccEvent.getWorkMap().findDuccWork(DuccType.Job, jobId);
-				if(job == null) {
-					StringBuffer message = new StringBuffer();
-					message.append("id:"+jobId);
-					message.append(" ");
-					message.append("state:"+NotFound);
-					info(message.toString());
-					jobActive.set(false);
-				}
-				else {
-					JobState jobState = (JobState) job.getStateObject();
-					String total = job.getSchedulingInfo().getWorkItemsTotal();
-					String completed = job.getSchedulingInfo().getWorkItemsCompleted();
-					String error = job.getSchedulingInfo().getWorkItemsError();
-					String retry = job.getSchedulingInfo().getWorkItemsRetry();
-					StringBuffer messageBuffer = new StringBuffer();
-					String message = messageBuffer.toString();
-					messageBuffer.append("id:"+jobId);
-					messageBuffer.append(" ");
-					messageBuffer.append("state:"+jobState);
-					switch(jobState) {
-					case Completed:
-						int count = job.getProcessMap().getAliveProcessCount();
-						if(count > 0) {
-							messageBuffer.append(" ");
-							messageBuffer.append("processes stopping:"+count);
-						}
-						else {
-							jobActive.set(false);
-							if(isWorkCompleted(total,completed)) {
-								rc.set(RC_SUCCESS);
-							}
-							else {
-								rc.set(RC_FAILURE);
-							}
-						}
-					case Running:
-					case Completing:
-						messageBuffer.append(" ");
-						messageBuffer.append("total:"+total);
-						messageBuffer.append(" ");
-						messageBuffer.append("done:"+completed);
-						messageBuffer.append(" ");
-						messageBuffer.append("error:"+error);
-						messageBuffer.append(" ");
-						messageBuffer.append("retry:"+retry);
-						break;
-					}
-					message = messageBuffer.toString();
-					synchronized(lastMessage) {
-						if(!message.equals(lastMessage)) {
-							info(message.toString());
-							lastMessage = message;
-						}
-					}
-				}
-				if(!jobActive.get()) {
-					main.interrupt();
-				}	
-			}
-			else {
-				debug("OR publication ignored...job not active");
-			}
+			debug(e);
 		}
 	}
 	
-	@Override
-	public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) {
-		throw new RuntimeException();
-	}
-	
-	protected int help(Options options) {
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setWidth(DuccUiConstants.help_width);
-		formatter.printHelp(DuccJobMonitor.class.getName(), options);
-		return 1;
+	private String details(MonitorInfo monitorInfo) {
+		StringBuffer sb = new StringBuffer();
+		sb.append(" ");
+		sb.append("total:");
+		sb.append(monitorInfo.total);
+		sb.append(" ");
+		sb.append("done:");
+		sb.append(monitorInfo.done);
+		sb.append(" ");
+		sb.append("error:");
+		sb.append(monitorInfo.error);
+		sb.append(" ");
+		sb.append("retry:");
+		sb.append(monitorInfo.retry);
+		sb.append(" ");
+		sb.append("procs:");
+		sb.append(monitorInfo.procs);
+		return sb.toString();
 	}
 	
 	public int run(String[] args) throws Exception {
 		/*
+		 * require DUCC_HOME 
+		 */
+		String ducc_home_key = "DUCC_HOME";
+		String ducc_home = System.getenv(ducc_home_key);
+		if(ducc_home == null) {
+			duccMessageProcessor.err("missing required environment variable: "+ducc_home_key);
+			rc.set(RC_FAILURE);
+			return rc.get();
+		}
+		/*
 		 * parser is not thread safe?
 		 */
 		synchronized(DuccUi.class) {
@@ -312,19 +243,12 @@ public class DuccJobMonitor extends Ducc
 			 * give help & exit when requested
 			 */
 			if (commandLine.hasOption(DuccUiConstants.name_help)) {
-				return help(options);
+				help(options);
+				return rc.get();
 			}
 			if(commandLine.getOptions().length == 0) {
-				return help(options);
-			}
-			/*
-			 * require DUCC_HOME 
-			 */
-			String ducc_home_key = "DUCC_HOME";
-			String ducc_home = System.getenv(ducc_home_key);
-			if(ducc_home == null) {
-				duccMessageProcessor.err("missing required environment variable: "+ducc_home_key);
-				return 1;
+				help(options);
+				return rc.get();
 			}
 			/*
 			 * detect duplicate options
@@ -336,20 +260,19 @@ public class DuccJobMonitor extends Ducc
 			 * timestamp
 			 */
 			if (commandLine.hasOption(DuccUiConstants.name_timestamp)) {
-				//logger = logger_ts;
+				timestamp = true;
 			}
 			/*
 			 * verbosity
 			 */
 			if (commandLine.hasOption(DuccUiConstants.name_debug)) {
-				//logger.setLevel(Level.DEBUG);
 				debug = true;
 			}
 			/*
 			 * cancel job enabled
 			 */
 			if (commandLine.hasOption(DuccUiConstants.name_monitor_cancel_job_on_interrupt)) {
-				cancel_job_on_interrupt = true;
+				cancel_job_on_interrupt.set(true);
 			}
 			/*
 			 * job id
@@ -364,130 +287,117 @@ public class DuccJobMonitor extends Ducc
 				rc.set(RC_HELP);
 				return rc.get();
 			}
-			/*
-			 * broker & endpoint
-			 */
-			if (commandLine.hasOption(DuccUiConstants.name_service_broker)) {
-				broker = commandLine.getOptionValue(DuccUiConstants.name_service_broker);
-			}
-			if (commandLine.hasOption(DuccUiConstants.name_service_endpoint)) {
-				endpoint = commandLine.getOptionValue(DuccUiConstants.name_service_endpoint);
-			}
 		}
-		/* 
-		 * echo
-		 */
-		debug("jmsProvider="+jmsProvider);
-		debug("broker="+broker);
-		debug("endpoint="+endpoint);
-		debug("id="+jobId);
 		
 		main = Thread.currentThread();
 		
 		Thread killer = new Killer(main);
 		Runtime.getRuntime().addShutdownHook(killer);
 		
-		start();
-		
-		StringBuffer msgName = new StringBuffer();
-		msgName.append("id:"+jobId);
-		msgName.append(" ");
-		msgName.append("location:");
-		msgName.append(ManagementFactory.getRuntimeMXBean().getName());
-		info(msgName.toString());
+		duccPropertiesResolver = DuccPropertiesResolver.getInstance();
 		
-		debug("monitor start");
-		while(jobActive.get()) {
-			try {
-				Thread.sleep(wakeupTime);
-			} catch (InterruptedException e) {
-				debug(e);
-			}
-			debug("monitor active...");
-		}
-		debug("monitor stop");
-		stop();
+		adjustWakeupInterval();
 		
-		Runtime.getRuntime().removeShutdownHook(killer);
-		
-		showErrors();
-		
-		StringBuffer msgRc = new StringBuffer();
-		msgRc.append("id:"+jobId);
-		msgRc.append(" ");
-		msgRc.append("rc:"+rc.get());
-		info(msgRc.toString());
-		
-		return rc.get();
-	}
-	
-	private void dumpFile(String fileName) {
-		try {
-			String prefix = "id:"+jobId+" ";
-			String data = "file:"+fileName;
-			info(prefix+data);
-			FileInputStream fis = new FileInputStream(fileName);
-			DataInputStream dis = new DataInputStream(fis);
-			BufferedReader br = new BufferedReader(new InputStreamReader(dis));
-			int maxErrors = 1;
-			int maxLines = MAXLINES;
-			int lines = 0;
-			int errors = 0;
-			while(true) {
-				String line = br.readLine();
-				lines ++;
-				if(line == null) {
-					break;
+		boolean observer = true;
+		String urlString = getUrlString(jobId);
+		String lastMessage = "";
+		String thisMessage = "";
+		
+		StringBuffer message = new StringBuffer();
+		
+		message.append("id:"+jobId);
+		message.append(" ");
+		message.append("location:");
+		message.append(ManagementFactory.getRuntimeMXBean().getName());
+		info(message.toString());
+		
+		while(observer) {
+			
+			String json = getSingleLineStatus(urlString);
+			debug(json);
+			
+			if(json != null) {
+				Gson gson = new Gson();
+				MonitorInfo monitorInfo = gson.fromJson(json, MonitorInfo.class);
+
+				int stateCount = monitorInfo.stateSequence.size();
+				debug("states:"+stateCount);
+				if(stateCount <= 0) {
+					message = new StringBuffer();
+					message.append("id:"+jobId);
+					message.append(" ");
+					message.append("state:"+NotFound);
+					thisMessage = message.toString();
+					info(thisMessage);
+					return rc.get();
 				}
-				String tline = line.trim();
-				if(tline.length() == 0) {
-					continue;
+				
+				String state = "";
+				Iterator<String> states = monitorInfo.stateSequence.iterator();
+				while(states.hasNext()) {
+					state = states.next();
 				}
-				String[] tokens = tline.split(" ");
-				if(tokens.length > 5) {
-					if(tokens[5].trim().equals("ERROR")) {
-						errors++;
-					}
+				
+				message = new StringBuffer();
+				message.append("id:"+jobId);
+				message.append(" ");
+				message.append("state:"+state);
+				
+				if(state.equals(StateRunning)) {
+					message.append(details(monitorInfo));
+				}
+				else if(state.equals(StateCompleting)) {
+					cancel_job_on_interrupt.set(false);
+					message.append(details(monitorInfo));
 				}
-				info(prefix+line);
-				if(lines > maxLines) {
-					info(prefix+"more...");
-					break;
+				else if(state.equals(StateCompleted)) {
+					cancel_job_on_interrupt.set(false);
+					message.append(details(monitorInfo));
 				}
-				if(errors > maxErrors) {
-					info(prefix+"more...");
-					break;
+				
+				thisMessage = message.toString();
+				if(!thisMessage.equals(lastMessage)) {
+					info(thisMessage);
+					lastMessage = thisMessage;
+				}
+				
+				if(state.equals(StateCompleted)) {
+					if(monitorInfo.procs.equals("0")) {
+						if(monitorInfo.total.equals(monitorInfo.done)) {
+							message = new StringBuffer();
+							message.append("id:"+jobId);
+							message.append(" ");
+							message.append("rc:"+RC_SUCCESS);
+							thisMessage = message.toString();
+							info(thisMessage);
+							rc.set(RC_SUCCESS);
+							return rc.get();
+						}
+						else {
+							message = new StringBuffer();
+							message.append("id:"+jobId);
+							message.append(" ");
+							message.append("rc:"+RC_FAILURE);
+							thisMessage = message.toString();
+							info(thisMessage);
+							rc.set(RC_FAILURE);
+							return rc.get();
+						}
+					}
 				}
 			}
-			br.close();
-			dis.close();
-			fis.close();
-		}
-		catch(Exception e) {
-			debug(e);
-		}
-	}
-	
-	private void showErrors() {
-		if(job != null) {
-			StringBuffer sb = new StringBuffer();
-			sb.append(job.getLogDirectory());
-			if(!job.getLogDirectory().endsWith(File.separator)) {
-				sb.append(File.separator);
+			else {
+				error("error: timeout accessing "+urlString);
 			}
-			sb.append(job.getDuccId().getFriendly()+File.separator);
-			sb.append("jd.err.log");
+
 			try {
-				String fileName = sb.toString();
-				File file = new File(fileName);
-				if(file.canRead()) {
-					dumpFile(fileName);
-				}
-			}
-			catch(Exception e) {
+				Thread.sleep(wakeupInterval);
+			} catch (InterruptedException e) {
 				debug(e);
 			}
 		}
+		
+		return rc.get();
 	}
 	
 	private class Killer extends Thread {
@@ -497,7 +407,7 @@ public class DuccJobMonitor extends Ducc
 		
 		public void run() {
 			StringBuffer message = new StringBuffer();
-			if(cancel_job_on_interrupt) {
+			if(cancel_job_on_interrupt.get()) {
 				message.append("killer: cancel");
 				cancel();
 			}
@@ -513,8 +423,8 @@ public class DuccJobMonitor extends Ducc
        		ArrayList<String> arrayList = new ArrayList<String>();
        		arrayList.add("--"+DuccUiConstants.name_job_id);
        		arrayList.add(jobId);
-       		arrayList.add("--"+DuccUiConstants.name_service_broker);
-       		arrayList.add(broker);
+       		arrayList.add("--"+DuccUiConstants.name_reason);
+       		arrayList.add("\"submitter was terminated via interrupt\"");
        		String[] argList = arrayList.toArray(new String[0]);
     		DuccJobCancel duccJobCancel = new DuccJobCancel();
     		int retVal = duccJobCancel.run(argList);
@@ -526,7 +436,6 @@ public class DuccJobMonitor extends Ducc
     		duccMessageProcessor.exception(e);
     	}
 	}
-
 	public static void main(String[] args) {
 		try {
 			DuccJobMonitor duccJobMonitor = new DuccJobMonitor();