You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:02:19 UTC

svn commit: r1427906 [3/5] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-agent: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/agent/ main/java/org/apache/u...

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.event;
+
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.agent.Agent;
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.ProcessLifecycleController;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.DuccJobsStateEvent;
+import org.apache.uima.ducc.transport.event.ProcessPurgeDuccEvent;
+import org.apache.uima.ducc.transport.event.ProcessStartDuccEvent;
+import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccUserReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccJobDeployment;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+
+@Qualifier(value = "ael")
+
+public class AgentEventListener implements DuccEventDelegateListener {
+	DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
+	ProcessLifecycleController lifecycleController = null;
+	
+	private NodeAgent agent;
+	public AgentEventListener(NodeAgent agent, ProcessLifecycleController lifecycleController) {
+		this.agent = agent;
+		this.lifecycleController = lifecycleController;
+	}
+	public AgentEventListener(NodeAgent agent) {
+		this.agent = agent;
+	}
+	public void setDuccEventDispatcher( DuccEventDispatcher eventDispatcher ) {
+		//this.eventDispatcher = eventDispatcher;
+	}
+	private void reportIncomingStateForThisNode(DuccJobsStateEvent duccEvent) throws Exception {
+    StringBuffer sb = new StringBuffer();
+    
+	  for( IDuccJobDeployment jobDeployment : duccEvent.getJobList()) {
+      if ( isTargetNodeForProcess(jobDeployment.getJdProcess()) ) {
+        IDuccProcess process = jobDeployment.getJdProcess();
+        sb.append("\nJD--> JobId:"+jobDeployment.getJobId()+" ProcessId:"+process.getDuccId()+" PID:"+process.getPID()+" Status:"+process.getProcessState() + " Resource State:"+process.getResourceState()+" isDeallocated:"+process.isDeallocated());
+      }
+      for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
+        if ( isTargetNodeForProcess(process) ) {
+          sb.append("\n\tJob ID:"+jobDeployment.getJobId()+" ProcessId:"+process.getDuccId()+" PID:"+process.getPID()+" Status:"+process.getProcessState() + " Resource State:"+process.getResourceState()+" isDeallocated:"+process.isDeallocated());
+        }
+      }
+    }
+    logger.info("reportIncomingStateForThisNode",null,sb.toString());
+	}
+	/**
+	 * This method is called by Camel when PM sends DUCC state to agent's queue. It 
+	 * takes responsibility of reconciling processes on this node. 
+	 * 
+	 * @param duccEvent - contains list of current DUCC jobs. 
+	 * @throws Exception
+	 */
+	public void onDuccJobsStateEvent(@Body DuccJobsStateEvent duccEvent) throws Exception {
+		//  typically lifecycleController is null and the agent assumes the role. For jUnit testing though, 
+	  //  a different lifecycleController is injected to facilitate black box testing
+	  if ( lifecycleController == null ) {
+			lifecycleController = agent;
+		}
+	  // Recv'd ducc state update, restart process reaper task which detects missing
+	  // OR state due to a network problem. 
+//	  try {
+//	    agent.restartProcessReaperTimer();
+//	  } catch( Exception e) {
+//	    logger.error("onDuccJobsStateEvent", null, "", e);
+//	  }
+	  
+		try {
+		  //  print JP report targeted for this node
+		  reportIncomingStateForThisNode(duccEvent);
+		  
+		  List<DuccUserReservation> reservations = 
+		           duccEvent.getUserReservations();
+		   agent.setReservations(reservations);
+			//	Stop any process that is in this Agent's inventory but not associated with any
+			//  of the jobs we just received
+			agent.takeDownProcessWithNoJob(agent,duccEvent.getJobList());
+			//	iterate over all jobs and reconcile those processes that are assigned to this agent. First,
+			//  look at the job's JD process and than JPs.
+			for( IDuccJobDeployment jobDeployment : duccEvent.getJobList()) {
+				//	check if this node is a target for this job's JD 
+				if ( isTargetNodeForProcess(jobDeployment.getJdProcess()) ) {
+					// agent will check the state of JD process and either start, stop, or take no action
+					ICommandLine jdCommandLine = jobDeployment.getJdCmdLine();
+					if(jdCommandLine != null) {
+						agent.reconcileProcessStateAndTakeAction(lifecycleController, jobDeployment.getJdProcess(), jobDeployment.getJdCmdLine(), 
+							jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
+					}
+					else {
+						logger.error("onDuccJobsStateEvent", null, "job is service");
+					}
+				} 
+				// check JPs
+				for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
+					if ( isTargetNodeForProcess(process) ) {
+	          // agent will check the state of JP process and either start, stop, or take no action 
+						agent.reconcileProcessStateAndTakeAction(lifecycleController, process, jobDeployment.getJpCmdLine(), 
+								jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
+					}
+				}
+			}
+		} catch( Exception e ) {
+			logger.error("onDuccJobsStateEvent", null, e);
+		}
+	}
+	/**
+	 * Wrapper method for Utils.isTargetNodeForMessage()
+	 * 
+	 * @param process
+	 * @return
+	 * @throws Exception
+	 */
+	private boolean isTargetNodeForProcess( IDuccProcess process ) throws Exception {
+		boolean retVal = false;
+		if(process != null) {
+			retVal = Utils.isTargetNodeForMessage(process.getNodeIdentity().getIp(),agent.getIdentity().getIp());
+		}
+		return retVal;
+	}
+	public void onProcessStartEvent(@Body ProcessStartDuccEvent duccEvent) throws Exception {
+		//	iterate given ProcessMap and start a Process if this node is a target
+		for( Entry<DuccId, IDuccProcess> processEntry : duccEvent.getProcessMap().entrySet()) {
+			//	check if this Process should be launched on this node. A process map
+			//	may contain processes not meant to run on this node. Each Process instance
+			//  in a Map has a node assignment. Only if this assignment matches this node
+			//  the agent will start a process.
+			if ( Utils.isTargetNodeForMessage(processEntry.getValue().getNodeIdentity().getIp(),agent.getIdentity().getIp()) ) { 
+				logger.info(">>> onProcessStartEvent", null,"... Agent ["+agent.getIdentity().getIp()+"] Matches Target Node Assignment:"+processEntry.getValue().getNodeIdentity().getIp()+" For Share Id:"+  processEntry.getValue().getDuccId());
+				agent.doStartProcess(processEntry.getValue(),duccEvent.getCommandLine(), duccEvent.getStandardInfo(), duccEvent.getDuccWorkId());
+                if ( processEntry.getValue().getProcessType().equals(ProcessType.Pop)) {
+                	break; // there should only be one JD process to launch
+                } else {
+    				continue;
+                }
+			} 
+		}
+	}
+	public void onProcessStopEvent(@Body ProcessStopDuccEvent duccEvent) throws Exception {
+		for( Entry<DuccId, IDuccProcess> processEntry : duccEvent.getProcessMap().entrySet()) {
+			if ( Utils.isTargetNodeForMessage(processEntry.getValue().getNodeIdentity().getIp(), agent.getIdentity().getIp()) ) {
+				logger.info(">>> onProcessStopEvent", null,"... Agent Received StopProces Event - Process Ducc Id:"+processEntry.getValue().getDuccId()+" PID:"+processEntry.getValue().getPID());
+				agent.doStopProcess(processEntry.getValue());
+			}
+		}
+	}
+	public void onProcessStateUpdate(@Body ProcessStateUpdateDuccEvent duccEvent) throws Exception {
+		logger.info(">>> onProcessStateUpdate", null,"... Agent Received ProcessStateUpdateDuccEvent - Process State:"+duccEvent.getState()+" Process ID:"+duccEvent.getDuccProcessId());
+//		agent.updateProcessStatus(duccEvent.getDuccProcessId(), duccEvent.getPid(), duccEvent.getState());
+		agent.updateProcessStatus(duccEvent);
+	}
+	public void onProcessPurgeEvent(@Body ProcessPurgeDuccEvent duccEvent) throws Exception {
+		logger.info(">>> onProcessPurgeEvent", null,"... Agent Received ProcessPurgeDuccEvent -"+" Process ID:"+duccEvent.getProcess().getPID());
+		agent.purgeProcess(duccEvent.getProcess());
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.event;
+
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+
+public interface ProcessLifecycleObserver {
+	public void onProcessExit(IDuccProcess process);
+	public void onJPInitTimeout(IDuccProcess process);
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.exceptions;
+
+public class DuccNodeException extends Exception {
+
+  private static final long serialVersionUID = -5548490125536576177L;
+  public DuccNodeException(String msg) {
+    super(msg);
+  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.exceptions;
+
+public class DuccNodeInitializationException extends DuccNodeException {
+
+  private static final long serialVersionUID = -2742893598112795349L;
+  public DuccNodeInitializationException(String msg) {
+    super(msg);
+  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+public abstract class CommandExecutor implements Callable<Process> {
+	protected Process managedProcess = null;
+	protected String host;
+	protected String ip;
+    protected ICommandLine cmdLine;
+    protected NodeAgent agent;
+	public abstract void stop() throws Exception;
+
+	public abstract Process exec(ICommandLine commandLine,
+			Map<String, String> processEnv) throws Exception;
+
+	public CommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host, String ip, Process managedProcess)
+			throws Exception {
+		this.agent = agent;
+		this.host = host;
+		this.ip = ip;
+		this.managedProcess = managedProcess;
+		this.cmdLine = cmdLine;
+	}
+	/**
+	 * Called after process is launched. Assign PID, state and finally drain
+	 * process streams. 
+	 * 
+	 * @param process - launched process
+	 */
+	protected void postExecStep(java.lang.Process process, DuccLogger logger, boolean isKillCmd) {
+		if ( !isKillCmd ) {
+			int pid = Utils.getPID(process);
+			if ( pid != -1 ) {
+	    	    ((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
+	    	    if ( !((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Job_Uima_AS_Process) ) {
+	    	    	//	For non uima as processes assume the process is Running after launch
+	    	    	if ( !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Stopped)) {
+	    	    		((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Running);
+	    	    	}
+	    	    }
+			} 
+		}
+	
+		//	Drain process streams in dedicated threads.
+		((ManagedProcess) managedProcess).drainProcessStreams(process, logger, System.out, isKillCmd);
+	}
+	/**
+	 * Called by Executor to exec a process. 
+	 */
+	public Process call() throws Exception {
+		Process deployedProcess = null;
+		try {
+			//ICommandLine commandLine = ((ManagedProcess) managedProcess).getCommandLine();
+			Map<String, String> env = new HashMap<String, String>();
+			if ( !isKillCommand(cmdLine)) {
+	      //  Enrich environment for the new process. Via these settings the UIMA AS
+	      //  service wrapper can notify the agent of its state.
+	      env.put("IP", ip);
+	      env.put("NodeName", host);
+	      //  Add process unique ducc id to correlate process state updates
+	      env.put("ProcessDuccId", ((ManagedProcess) managedProcess).getDuccId().getUnique()); 
+	      if ( ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Job_Uima_AS_Process)) {
+	        ((ManagedProcess) managedProcess).startInitializationTimer();
+	      }
+			}
+			deployedProcess = exec(cmdLine, env);
+		} catch (Exception e) {
+			e.printStackTrace();
+		} finally {
+			if ( ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Job_Uima_AS_Process)) {
+				((ManagedProcess) managedProcess).stopInitializationTimer();
+			}
+		}
+		return deployedProcess;
+	}
+	 protected boolean isKillCommand(ICommandLine cmdLine) {
+	    return ( cmdLine.getExecutable() != null && 
+	            ( cmdLine.getExecutable().startsWith("/bin/kill") || 
+	              cmdLine.getExecutable().startsWith("taskkill") ) );
+	  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.DuccExchange;
+import org.apache.uima.ducc.transport.cmdline.ACommandLine;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.TimeWindow;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+public class DuccCommandExecutor extends CommandExecutor {
+	DuccLogger logger = DuccLogger.getLogger(this.getClass(),
+			NodeAgent.COMPONENT_NAME);
+	@SuppressWarnings("unused")
+	private static AtomicInteger nextPort = new AtomicInteger(30000);
+	
+	public DuccCommandExecutor(NodeAgent agent, ICommandLine cmdLine,String host, String ip, Process managedProcess)
+			throws Exception {
+		super(agent, cmdLine, host, ip, managedProcess);
+	}
+	public DuccCommandExecutor(ICommandLine cmdLine,String host, String ip, Process managedProcess)
+			throws Exception {
+		super(null, cmdLine, host, ip, managedProcess);
+	}
+
+	private boolean useDuccSpawn() {
+		if ( super.managedProcess.isAgentProcess() || Utils.isWindows() ) {
+			return false;
+		}
+		// On non-windows check if we should spawn the process via ducc_ling
+		String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
+		if ( useSpawn != null && useSpawn.toLowerCase().equals("true")) {
+			return true;
+		}
+		// default 
+		return false;
+	}
+
+	public Process exec(ICommandLine cmdLine, Map<String, String> processEnv)
+			throws Exception {
+		String methodName = "exec";
+		
+//		DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId();
+		try {
+			String[] cmd = getDeployableCommandLine(cmdLine,processEnv);			
+			if ( isKillCommand(cmdLine) ) {
+        logger.info(methodName, null, "Killing process");
+				stopProcess(cmdLine, cmd);
+			} else {
+				startProcess(cmdLine, cmd, processEnv);
+			}
+			return managedProcess;
+		} catch (Exception e) {
+		  if ( ((ManagedProcess)super.managedProcess).getDuccProcess() != null ) {
+  	    DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId();
+	      logger.error(methodName, duccId, ((ManagedProcess)super.managedProcess).getDuccProcess().getDuccId(), e, new Object[]{});
+		  }
+			throw e;
+		} 
+	}
+	private boolean isProcessTerminated( IDuccProcess process ) {
+		return process.getProcessState().equals(ProcessState.Stopped) ||
+				process.getProcessState().equals(ProcessState.Failed) ||
+				process.getProcessState().equals(ProcessState.FailedInitialization);
+	}
+	private void stopProcess(ICommandLine cmdLine, String[] cmd ) throws Exception {
+		String methodName = "stopProcess";
+		Future<?> future = ((ManagedProcess) managedProcess).getFuture();
+		if ( future == null ) {
+			throw new Exception("Future Object not Found. Unable to Stop Process with PID:"+((ManagedProcess) managedProcess).getPid());
+		}
+		// for stop to work, PID must be provided
+		if (((ManagedProcess) managedProcess).getDuccProcess().getPID() == null || 
+				((ManagedProcess) managedProcess).getDuccProcess().getPID().trim().length() == 0 ) {  
+			throw new Exception("Process Stop Command Failed. PID not provided.");
+		}
+		long maxTimeToWaitForProcessToStop = 60000; // default 1 minute 
+		if ( super.agent.configurationFactory.processStopTimeout != null ) {
+			maxTimeToWaitForProcessToStop = Long.valueOf(super.agent.configurationFactory.processStopTimeout);
+		}
+		try {
+			//	if the process is marked for death or still initializing or it is JD, kill it
+			if (    ((ManagedProcess) managedProcess).doKill() ||
+					((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop) ||
+					((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Initializing) ||
+          ((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Starting) ||
+					((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.FailedInitialization)) {
+				logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),">>>>>>>>>>>>>>> Killing Process:"+((ManagedProcess) managedProcess).getPid()+" .Process State:Initializing");
+				doExec(new ProcessBuilder(cmd), cmd, true);
+			} else { // send stop request to quiesce the service
+				Map<String, Object> msgHeader = new HashMap<String, Object>();
+				//	Add PID to the header. Receiving process has a PID filter in its router
+				//  to determine if the stop message is destined for it. 
+				msgHeader.put(DuccExchange.ProcessPID, ((ManagedProcess) managedProcess).getPid());		
+				msgHeader.put(DuccExchange.DUCCNODEIP, ((ManagedProcess) managedProcess).getDuccProcess().getNodeIdentity().getIp());		
+				logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"Agent Sending Stop request to remote process with PID:"+((ManagedProcess) managedProcess).getPid()+ " On Node:"+((ManagedProcess) managedProcess).getDuccProcess().getNodeIdentity().getIp());
+				try {
+	        //  Use either Mina (socket) or jms to communicate with an agent
+	        if (((ManagedProcess) managedProcess).getSocketEndpoint() != null ) {
+	          
+	          logger.info(methodName,null,"Agent Sending <<STOP>> to Endpoint:"+((ManagedProcess) managedProcess).getSocketEndpoint());
+	          //  socket transport
+	          super.
+	          agent.
+	            getEventDispatcherForRemoteProcess().
+	              dispatch( new ProcessStopDuccEvent(new HashMap<DuccId, IDuccProcess>()), 
+	                      ((ManagedProcess) managedProcess).getSocketEndpoint(), msgHeader );
+	          
+	        } else {
+	          //  jms transport
+	          super.
+	            agent.
+	              getEventDispatcherForRemoteProcess().
+	                dispatch(new ProcessStopDuccEvent(new HashMap<DuccId, IDuccProcess>()),msgHeader );
+	        }
+				} catch( Exception ex) {
+		      logger.error(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), ex, new Object[]{});
+				} finally {
+	        logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Dispatched STOP Request to Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" Waiting for Process to Stop. Timout Value:"+maxTimeToWaitForProcessToStop+" millis");
+	        try {
+	          // Start Kill timer only if process is still running
+	          if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Running)) {
+	            // the following call will block!!! Waits for process to stop on its own. If it doesnt stop, kills it hard.
+	            // The exact time the service is allotted for stopping is defined in ducc.properties 
+	            logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Starting Timer For Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
+	            future.get(maxTimeToWaitForProcessToStop, TimeUnit.MILLISECONDS);
+	          } else {
+	            logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Dispatched STOP Request to Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process Not In Running State");
+	          }
+	        } catch (TimeoutException tex) { // on time out kill the process
+	          if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Running)) {
+	            logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Timed-out Waiting for Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" to Stop. Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process did not stop in alloted time of "+maxTimeToWaitForProcessToStop+" millis");
+	            logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),">>>>>>>>>>>>>>> Killing Process:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" .Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
+	            doExec(new ProcessBuilder(cmd), cmd, true);
+	          } else {
+	            logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Timed-out Waiting for Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" to Stop but the process is not in a running state. Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
+	          }
+	        } 
+				}
+			}
+		} catch( Exception e) {  // InterruptedException, ExecutionException
+		  logger.error(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), e, new Object[]{});
+		}
+	}
+	
+	private void startProcess(ICommandLine cmdLine,String[] cmd, Map<String, String> processEnv) throws Exception {
+		String methodName = "startProcess";
+		
+		ITimeWindow twr = new TimeWindow();
+			String millis;
+			millis = TimeStamp.getCurrentMillis();
+			
+			((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowRun(twr);
+			twr.setStart(millis);
+			ProcessBuilder pb = new ProcessBuilder(cmd);
+
+			if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop)) {
+				ITimeWindow twi = new TimeWindow();
+				((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowInit(twi);
+				twi.setStart(millis);
+				twi.setEnd(millis);
+			}
+			String workingDir = null;
+			//	Set working directory if a user specified it in a job specification
+			//if ( ((ManagedProcess)super.managedProcess).getProcessInfo() != null ) {
+			//	workingDir = ((ManagedProcess)super.managedProcess).getProcessInfo().getWorkingDirectory();
+			//}
+			//if ( workingDir != null ) {
+            //		logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), "Launching process in a user provided working dir:"+workingDir);
+            //		pb.directory(new File(workingDir));
+			//}
+			Map<String, String> env = pb.environment();
+			// enrich Process environment
+			env.putAll(processEnv);
+			if( cmdLine instanceof ACommandLine ) {
+				// enrich Process environment with one from a given command line
+				env.putAll(((ACommandLine)cmdLine).getEnvironment());
+			}
+			if ( logger.isTrace()) {
+	      // <dump>
+	      Iterator<String> iterator = env.keySet().iterator();
+	      while(iterator.hasNext()) {
+	        String key = iterator.next();
+	        String value = env.get(key);
+	        String message = "key:"+key+" "+"value:"+value;
+	        logger.trace(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), message);
+	      }
+			}
+			try {
+				doExec(pb, cmd, isKillCommand(cmdLine));
+			} catch(Exception e) {
+				throw e;
+			} finally {
+				millis = TimeStamp.getCurrentMillis();
+				twr.setEnd(millis);
+			}
+	}
+	private void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd) throws Exception {
+		String methodName = "doExec";
+
+		try {
+			
+			StringBuilder sb = new StringBuilder((isKillCommand(cmdLine) ?"--->Killing Process ":"---> Launching Process:")
+					+ " Using command line:");
+			int inx=0;
+			for (String cmdPart : cmd) { 
+				sb.append("\n\t[").append(inx++).append("]").append(Utils.resolvePlaceholderIfExists(cmdPart, System.getProperties()));
+			}
+			logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), sb.toString());
+
+			java.lang.Process process = pb.start();
+			// Drain process streams
+			postExecStep(process, logger, isKillCmd);
+			// block waiting for the process to terminate.
+			process.waitFor();
+			if ( !isKillCommand(cmdLine) ) {
+				logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), ">>>>>>>>>>>>> Process with PID:"+((ManagedProcess)super.managedProcess).getDuccProcess().getPID()+" Terminated");
+			}
+		} catch( NullPointerException ex) {
+			((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
+			StringBuffer sb = new StringBuffer();
+			sb.setLength(0);  
+			sb.append("\n\tJava ProcessBuilder Failed to Launch Process due to NullPointerException. An Entry in the Command Array Must be Null. Look at Command Array Below:\n");
+			for (String cmdPart : cmd) { 
+				if ( cmdPart != null ) {
+					sb.append("\n\t").append(cmdPart);
+				} 
+			}
+			logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), sb.toString());
+			((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
+			throw ex;
+		} catch( Exception ex) {
+			((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
+			throw ex;
+		} finally {
+			//	 Per team discussion on Aug 31 2011, the process is stopped by an agent when initialization
+			//   times out or initialization failed. Both Initialization_Timeout and FailedIntialization imply
+			//   that the process is stopped.
+			if ( !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.InitializationTimeout) && 
+				 !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.FailedInitialization) &&
+				 !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Failed) && 
+        !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Killed)) { 
+				((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Stopped);
+			}
+		}
+
+	}
+
+	private String[] getDeployableCommandLine(ICommandLine cmdLine, Map<String, String> processEnv) throws Exception {
+	  String methodName = "getDeployableCommandLine";
+	  String[] cmd = new String[0];
+
+	  try {
+	    //	lock using Agent single permit semaphore. The Utils.concatAllArrays()
+	    //  uses native call (for efficiency) which appears not thread safe.
+	    NodeAgent.lock();
+	    //	Use ducc_ling (c code) as a launcher for the actual process. The ducc_ling
+	    //  allows the process to run as a specified user in order to write out logs in
+	    //  user's space as oppose to ducc space.
+	    String c_launcher_path = 
+	            Utils.resolvePlaceholderIfExists(
+	                    System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());
+
+	    //	if the command line is kill, don't provide any logging info to the ducc_ling. Otherwise, 
+	    //  ducc_ling creates and empty log for each time we are killing a process
+	    if ( isKillCommand(cmdLine) ) {
+	      // Duccling, with no logging, always run by ducc, no need for workingdir
+	      String[] duccling_nolog = new String[] { c_launcher_path,
+	              "-u", ((ManagedProcess)super.managedProcess).getOwner(),
+	      "--" };        
+	      if ( useDuccSpawn() ) {
+	        cmd = Utils.concatAllArrays(duccling_nolog, new String[] {cmdLine.getExecutable()},cmdLine.getCommandLine());
+	      } else {
+	        cmd = Utils.concatAllArrays(new String[] {cmdLine.getExecutable()},cmdLine.getCommandLine());
+	      }
+	    } else {
+	      String processType = "-UIMA-";
+	      if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop)) {
+	        processType = "-JD-";
+	      }             
+	      String processLogDir = ((ManagedProcess)super.managedProcess).getProcessInfo().getLogDirectory()+
+	              (((ManagedProcess)super.managedProcess).getProcessInfo().getLogDirectory().endsWith(File.separator) ? "" : File.separator)+
+	              ((ManagedProcess)super.managedProcess).getWorkDuccId()+File.separator;
+	      String processLogFile = ((ManagedProcess)super.managedProcess).getWorkDuccId()+ processType+host;            
+	      String workingDir = ((ManagedProcess)super.managedProcess).getProcessInfo().getWorkingDirectory();
+	      if ( workingDir == null ) {
+	        workingDir = "NONE";
+	      }
+
+	      // Duccling, with logging
+	      String[] duccling = new String[] { c_launcher_path,
+	              "-f", processLogDir+processLogFile, 
+	              "-w", workingDir,
+	              "-u", ((ManagedProcess)super.managedProcess).getOwner(),
+	              "--" };        
+	      //	For now, log to user's home directory
+	      //					String baseDir = 
+	      //							((ManagedProcess)super.managedProcess).getProcessInfo().getLogDirectory();
+
+	      String executable = cmdLine.getExecutable(); 
+	      //	Check if user specified which java to use to launch the process. If not provided,
+	      //	use the same java that the agent is running with 
+	      if (executable == null || executable.trim().length() == 0 ) {
+	        executable = System.getProperty("java.home")+File.separator+"bin"+File.separator+"java";
+	      }
+	      List<String> operationalProperties = new ArrayList<String>();
+
+	      if ( cmdLine instanceof JavaCommandLine ) {
+	        String duccHomePath = null;
+	        if ( (duccHomePath = System.getenv("DUCC_HOME")) == null ) {
+	          duccHomePath = System.getProperty("DUCC_HOME");
+	        }
+	        operationalProperties.add("-DDUCC_HOME="+duccHomePath);
+	        operationalProperties.add("-Dducc.deploy.configuration="+ 
+	                System.getProperty("ducc.deploy.configuration"));
+	        if ( System.getProperties().containsKey("ducc.agent.managed.process.state.update.endpoint.type") ) {
+	          String type = System.getProperty("ducc.agent.managed.process.state.update.endpoint.type");
+	          if (type != null && type.equalsIgnoreCase("socket")) {
+	            operationalProperties.add("-D"+NodeAgent.ProcessStateUpdatePort+"="+System.getProperty(NodeAgent.ProcessStateUpdatePort));
+	          }
+	        }
+	        operationalProperties.add("-Dducc.process.log.dir="+processLogDir);
+	        operationalProperties.add("-Dducc.process.log.basename="+processLogFile); //((ManagedProcess)super.managedProcess).getWorkDuccId()+ processType+host);
+	        operationalProperties.add("-Dducc.job.id="+((ManagedProcess)super.managedProcess).getWorkDuccId());
+
+	      }
+	      String[] operationalPropertiesArray = new String[operationalProperties.size()];
+
+	      if ( useDuccSpawn() ) {
+	        cmd = Utils.concatAllArrays(duccling, new String[] {executable}, operationalProperties.toArray(operationalPropertiesArray), cmdLine.getCommandLine());
+	      } else {
+	        cmd = Utils.concatAllArrays(new String[] {executable}, operationalProperties.toArray(operationalPropertiesArray), cmdLine.getCommandLine());
+	      }
+	      // add JobId to the env
+	      if ( processEnv != null ) {
+	        processEnv.put("JobId", String.valueOf(((ManagedProcess)super.managedProcess).getWorkDuccId().getFriendly()));
+	        //	for now just use user.home. In the long run the LogDir may
+	        //  come from job spec
+	      }
+	    }
+	    return cmd;
+	  } catch( Exception ex) {
+	    ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
+	    throw ex;
+	  } finally {
+	    NodeAgent.unlock();
+	  }
+
+	}
+	public void stop() {
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * Parses jvm args String provided by the user and returns a List of jvm args.
+ * The parser only expects -X and -D in the jvm args String. Supports a mix 
+ * of -X and -D args in an arbitrary order.  
+ *
+ */
+public class JvmArgsParser {
+	
+	/**
+	 * Parse -X args
+	 * 
+	 * @param cmd2run - where to add -X args
+	 * @param arg - string containing zero or more -X args
+	 */
+	private static void addJvmOptions(List<String> cmd2run, String arg) {
+		if ( arg == null || arg.trim().length() == 0 ) {
+			return;
+		}
+		boolean ignoreFirstArg = false;
+		//	the arg string may start with non -X arg, in which case ignore
+		//  that first part.
+		if ( !arg.trim().startsWith("-")) {
+			ignoreFirstArg = true;
+		}
+		//	Get -X tokens
+		String[] jvm_args = arg.split("-X");
+		for( String jvm_arg : jvm_args ) {
+			//	check if the first token contains non -X part, in which case ignore it
+			if ( ignoreFirstArg ) {
+				ignoreFirstArg = false;
+				continue;
+			}
+			//	 check if this is valid -X token and add it to a given list
+			if ( jvm_arg.trim().length() > 0 ) {
+				cmd2run.add("-X"+jvm_arg.trim());  // -X was stripped by split()
+			}
+		}
+	}
+	/**
+	 * Return a List of jvm args contained in a provided args string.
+	 * It first tokenizes the args string extracting -D args. Secondly,
+	 * it parses each token and extracts -X args. 
+	 * 
+	 * @param args - string containing a mix of -X and -D args
+	 * 
+	 * @return
+	 */
+	public static List<String> parse(String args) {
+		List<String> jvm_args = new ArrayList<String>();
+		//	tokenize on -D boundaries. Produced tokens may contain both -D and -X args
+		String[] jvm_args_array = args.split("-D");
+		for (String arg : jvm_args_array) {
+			if (arg.trim().length() > 0) {
+				int start = 0;
+				arg = arg.trim();
+				//	check if the current token contains -X arg
+				if ((start = arg.indexOf("-X")) > -1) {
+					//	parse current token and add -X args to the list
+					addJvmOptions(jvm_args, arg);
+				} else {
+					jvm_args.add("-D" + arg); // -D was stripped by split()
+					continue;
+				}
+				if (start == 0) {
+					continue;
+				}
+				jvm_args.add("-D" + arg.substring(0, start).trim()); // -D was stripped by split()
+			}
+		}
+		return jvm_args;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.net.InetAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
+import org.apache.uima.ducc.transport.event.common.DuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+
+
+public class Launcher {
+    private ExecutorService executorService = null;
+    private DuccIdFactory duccIdFactory = new DuccIdFactory();
+    public Launcher() {
+    	executorService = Executors.newCachedThreadPool();
+    }
+    /**
+     * This method is used to launch multiple Agents on the same physical machine. It allows
+     * to scale up Agents on a single machine to simulate load. Each Agent instance will 
+     * assume a given name and IP address.
+     * 
+     * @param cmdLine - defines the command line used to exec Agent process
+     * @param howMany - how many Agents to exec
+     * @param ip - initial IP address to assign to first Agent instance. Subsequent Agent instances
+     *             will have this IP incremented as in XXX.XXX.XXX.XX+1,XXX.XXX.XXX.XX+2, etc
+     * @param nodeName - name of the physical machine. The Agents will be assigned this name plus 
+     *                   a suffix as in watson-1, watson-2, etc
+     * @throws Exception
+     */
+    public void start(ICommandLine cmdLine, int howMany, String ip, String nodeName) throws Exception {
+        //	Launch as many agents as requested  
+        for( int i=0; i < howMany; i++ ) {
+            String host = new String(nodeName);
+            //	Append suffix to node name for each Agent instance
+            if ( host.indexOf(".") > -1 ) {
+                String tmp = host.substring(0,host.indexOf("."));
+                host = tmp+"-"+String.valueOf(i+1)+host.substring(host.indexOf("."));
+            } else {
+                host = host+"-"+String.valueOf(i+1);
+            }
+            launchProcess(host, ip, cmdLine);
+            //	Increment IP address for the next Agent
+            int uip = Integer.parseInt(ip.substring(ip.lastIndexOf(".")+1, ip.length()));
+            ip = ip.substring(0,ip.lastIndexOf(".")+1).concat(String.valueOf(uip+1));
+        }
+
+    }
+	
+    /**
+     * Submit request to exec a process. The process will be exec'd in a separate thread.
+     * 
+     * @param process
+     * @param commandLine
+
+     * @throws Exception
+     */
+    public ManagedProcess launchProcess(NodeAgent agent, NodeIdentity nodeIdentity,IDuccProcess process, ICommandLine commandLine,ProcessLifecycleObserver observer, ManagedProcess managedProcess) 
+    		throws Exception {
+        //	Instantiate executor that will actually exec the process using java's ProcessBuilder
+        DuccCommandExecutor executor = 
+            new DuccCommandExecutor(agent, commandLine, nodeIdentity.getName(),nodeIdentity.getIp(), managedProcess);
+        Future<?> future = executorService.submit(executor);
+        //	if we are launching a process, save the future object returned from Executor above
+    	managedProcess.setFuture(future);
+        return managedProcess;
+    }
+    /**
+     * This method is used to testing only. It enables launching an agent with 
+     * a given name and IP address which are different from a physical node name
+     * and IP address. With that multiple agents can be launched on the same 
+     * physical machine simulating a cluster of nodes.
+     * 
+     */
+    public void launchProcess(String host, String ip, ICommandLine cmdLine) throws Exception {
+        IDuccProcess process = 
+            new DuccProcess(duccIdFactory.next(), new NodeIdentity(ip, host));
+        process.setProcessType(ProcessType.Pop);
+        ManagedProcess managedProcess = new ManagedProcess(process, cmdLine, true);
+        DuccCommandExecutor executor = 
+            new DuccCommandExecutor(cmdLine, host, ip, managedProcess);
+        executorService.submit(executor);
+    }
+    public static void main(String[] args) {
+        try {
+            int howMany = Integer.parseInt(args[0]);   // how many agent processes to launch
+            String ip = System.getProperty("IP");
+            String nodeName = InetAddress.getLocalHost().getHostName();
+            Launcher launcher = new Launcher();
+            JavaCommandLine cmdLine = new JavaCommandLine("java");
+            String duccHome = System.getenv("DUCC_HOME");
+            cmdLine.addOption("-Dducc.deploy.configuration="+duccHome+"/resources/ducc.properties");
+            cmdLine.addOption("-Dducc.deploy.components=agent");
+            cmdLine.addOption("-Djava.library.path=" + duccHome +"/lib/sigar");
+            cmdLine.addOption("-DDUCC_HOME=" + duccHome);
+			
+            // System.out.println("Spawning with classpath: \n" + System.getProperty("java.class.path"));
+            // cmdLine.setClasspath(duccHome+"/lib/*:" + 
+            //                      duccHome+"/lib/apache-activemq-5.5.0/*:" + 
+            //                      duccHome+"/lib/apache-camel-2.7.1/*:" + 
+            //                      duccHome+"/lib/commons-collections-3.2.1/*:" + 
+            //                      duccHome+"/lib/apache-commons-lang-2.6/*:" + 
+            //                      duccHome+"/lib/apache-log4j-1.2.16/*:" + 
+            //                      duccHome+"/lib/guava-r09/*:" + 
+            //                      duccHome+"/lib/joda-time-1.6/*:" + 
+            //                      duccHome+"/lib/sigar/*:" + 
+            //                      duccHome+"/lib/springframework-3.0.5/*:");
+
+            cmdLine.setClasspath(System.getProperty("java.class.path"));
+            cmdLine.setClassName("org.apache.uima.ducc.agent.common.main.DuccService");
+            launcher.start(cmdLine, howMany, ip, nodeName);
+        } catch( Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
+import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
+import org.apache.uima.ducc.agent.launcher.ManagedServiceInfo.ServiceState;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.event.common.DuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+
+public class ManagedProcess implements Process {
+	
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	
+	private static final String JMXPortProperty = "com.sun.management.jmxremote.port"; 
+	/*
+	 * Process States
+	 *                     +------- STARTING
+	 *                    /           |
+	 *                   /            |
+	 *                  V             V
+	 *         INITIALIZING ------> FAILED 
+	 *               |             /
+	 *               |            /
+	 *               V           /
+	 *              READY ------>
+	 *               |
+	 *               |
+	 *               V
+	 *            STOPPED    
+	 */
+	private ServiceState state= ServiceState.STARTING;
+	public static enum ProcessType {
+		UIMA_SERVICE{}, 
+		APPLICATION{}, 
+		LAUNCHER{};
+	};
+
+
+	public static enum StopPriority {
+		KILL_9, QUIESCE_AND_STOP
+	}
+	private volatile boolean destroyed=false;
+	private IDuccProcess duccProcess;
+	//	Used to kill a process after it reports its PID. 
+	private volatile boolean killAfterLaunch=false;
+	private String pid;
+	private String processCorrelationId;
+	// JMX Port
+	private String port = null;
+	// Service socket endpoint 
+	private String socketEndpoint = null;
+	
+    private int tgCounter=1;
+	private String owner;
+	//	log path for http access
+	private String logPath;
+	//	absolute path to the process log
+	private String absoluteLogPath;
+
+	private String agentLogPath;
+	private boolean agentProcess = false;
+    private String nodeIp;
+    private String nodeName;
+    private String description;
+    private String parent;
+    
+	private ProcessType processType;
+	private volatile boolean attached;
+	private String clientId;
+//	private volatile boolean killed;
+	private Throwable exceptionStackTrace;
+  //	The following are transients to prevent serialization when sending heartbeat updates
+	//	to the controller. This class exports its state via AgentManagedProcess interface.
+	private transient List<String> command;
+	private transient java.lang.Process process;
+
+  protected transient ProcessStreamConsumer stdOutReader;
+	protected transient ProcessStreamConsumer stdErrReader;
+	private transient OutputStream processLogStream = null;
+	private transient CountDownLatch pidReadyCount = new CountDownLatch(1);
+
+	private ICommandLine commandLine;
+	
+	private ProcessLifecycleObserver observer;
+	
+	private Timer initTimer;
+	
+	// timer used to cleanup UIMA pipeline initializations stats
+  private Timer cleanupTimer;
+
+  private DuccId workDuccId;
+	
+	private IDuccStandardInfo processInfo;
+	
+	private transient Future<?> future;
+	
+	private transient volatile boolean isstopping;
+	
+	private transient volatile boolean forceKill;
+	
+	private transient DuccLogger logger;
+	
+	private long processMemoryAssignment;
+	
+	public ManagedProcess(IDuccProcess process, ICommandLine commandLine) {
+		this(process, commandLine, null, null,0);
+	}
+
+	public ManagedProcess(IDuccProcess process, ICommandLine commandLine,boolean agentProcess) {
+		this(process, commandLine, null, null,0);
+		this.agentProcess = agentProcess;
+	}
+
+	public ManagedProcess(IDuccProcess process, ICommandLine commandLine,ProcessLifecycleObserver observer, DuccLogger logger, long processMemoryAssignment) {
+		this.commandLine = commandLine;
+		this.duccProcess = process;
+		this.observer = observer;
+		this.logger = logger;
+		this.processMemoryAssignment = processMemoryAssignment;
+	}
+	public ManagedProcess( java.lang.Process process, boolean agentProcess, String correlationId ) {
+		this.process = process;
+		this.agentProcess = agentProcess;
+	}
+	public ManagedProcess( java.lang.Process process, boolean agentProcess ) {
+		this(process, agentProcess, null );
+	}
+	public boolean doKill() {
+		return forceKill;
+	}
+	public void kill() {
+		forceKill = true;
+	}
+	public void setStopping() {
+		isstopping = true;
+	}
+	public boolean isStopping() {
+		return isstopping;
+	}
+	public void setWorkDuccId( DuccId workDuccId ) {
+		this.workDuccId = workDuccId;
+	}
+	public DuccId getWorkDuccId() {
+		return this.workDuccId;
+	}
+	public DuccId getDuccId() {
+		return duccProcess.getDuccId();
+	}
+	public void setParent( String parent) {
+		this.parent = parent;
+	}
+	public String getParent() {
+		return parent;
+	}
+	
+	public void setClientId(String clientId){
+		this.clientId = clientId;
+	}
+    public String getClientId() {
+    	return clientId;
+    }
+	public boolean isAttached() {
+		return attached;
+	}
+
+	public void setAttached() {
+		this.attached = true;
+	}
+	
+	public String getNodeName() {
+		return nodeName;
+	}
+
+	public void setNodeName(String nodeName) {
+		this.nodeName = nodeName;
+	}
+	
+	public String getNodeIp() {
+		return nodeIp;
+	}
+
+	public void setNodeIp(String nodeIp) {
+		this.nodeIp = nodeIp;
+	}
+
+	/**
+	 * Return current state of this object.
+	 * 
+	 * @return
+	 */
+	public Process getInstance() {
+		return this;
+	}
+	public ProcessType getProcessType() {
+		return processType;
+	}
+	public void setProcessType(ProcessType processType) {
+		this.processType = processType;
+	}
+	public void setLogStream(OutputStream os ) {
+		processLogStream = os;
+	}
+	public boolean isAgentProcess() {
+		return agentProcess;
+	}
+	/**
+	 * @param uimaLogPath the uimaLogPath to set
+	 */
+	public void setLogPath(String logPath) {
+		this.logPath = logPath;
+	}
+
+	/**
+	 * @return the stdErrLogPath
+	 */
+	public String getLogPath() {
+		return logPath;
+	}
+	public void setAgentLogPath(String logPath) {
+		this.agentLogPath = logPath;
+	}
+
+	/**
+	 * @return the stdErrLogPath
+	 */
+	public String getAgentLogPath() {
+		return agentLogPath;
+	}
+	/**
+	 * @return the owner
+	 */
+	public String getOwner() {
+		if ( processInfo != null ) {
+			return processInfo.getUser();
+		}
+		return null;
+	}
+
+
+	public void setOSProcess(java.lang.Process process ) {
+		this.process = process;
+	}
+	private void log( String method, String message ) {
+	  if ( logger != null ) {
+	    logger.info(method, null, message);
+	  }
+	}
+	public void drainProcessStreams(java.lang.Process process , DuccLogger logger, PrintStream pStream, boolean isKillCmd ) { // InputStream outputStream, InputStream errorStream) {
+
+		//	Create dedicated Thread Group for the process stream consumer threads
+		ThreadGroup group = new ThreadGroup("AgentDeployer" + tgCounter++);
+		//	Fetch stdin from the deployed process
+		InputStream stdin = process.getInputStream();
+		//	Fetch stderr from the deployed process
+		InputStream stderr = process.getErrorStream();
+		//	Create dedicated thread to consume std output stream from the process 
+		stdOutReader = new ProcessStreamConsumer(logger, group, "StdOutputReader", stdin, pStream);
+		//	Create dedicated thread to consume std error stream from the process 
+		stdErrReader = new ProcessStreamConsumer(logger,  group, "StdErrorReader", stderr, pStream);
+		
+		//	Start both stream consumer threads
+		stdOutReader.start();
+		stdErrReader.start();
+		//	block until the process is terminated or the agent terminates
+		boolean finished = false;
+		while (!finished) {
+			try {
+				process.waitFor();
+			} catch (InterruptedException e) {
+			}
+			try {
+				process.exitValue();
+				finished = true;
+			} catch (IllegalThreadStateException e) {
+			}
+		}
+		try {
+		  // wait for stdout and stderr reader threads to finish. Join for max of 2 secs 
+      // The process has exited and in theory the join should return quickly.
+		  // We do the join to make sure that the streams are drained so that we
+		  // can get a reason for failure if there was a problem launching the process.
+		  stdOutReader.join(2000);
+	    stdErrReader.join(2000);
+		} catch( InterruptedException ie) {
+		  log("ManagedProcess.drainProcessStreams","Interrupted While Awaiting Termination of StdOutReader and StdErrReader Threads");
+		}
+		
+		String reason = 
+		        getDuccProcess().getReasonForStoppingProcess();
+		
+		ProcessState pstate = getDuccProcess().getProcessState();
+
+		if ( isKillCmd || 
+		        // if the process is to be killed due to init problems, set the state to Stopped
+		        ( reason != null &&
+		            (
+		               reason.equals(ReasonForStoppingProcess.FailedInitialization.toString() ) || 
+		               reason.equals(ReasonForStoppingProcess.InitializationTimeout.toString() )
+		            ) 
+		       )
+		   ) {
+		  getDuccProcess().setProcessState(ProcessState.Stopped);
+		  
+		} else {
+	    // Process has terminated. Determine why the process terminated.
+      log("ManagedProcess.drainProcessStreams", "Ducc Process with PID:"+getPid() +" Terminated while in "+pstate+" State");
+	    
+	    //  true if agent killed the process. Process either exceeded memory use or
+	    //  the PM state notifications stopped coming in.
+	    if ( doKill() ) {
+        // Agent killed the process due to timeout waiting for OR state
+        pstate = ProcessState.Killed;
+	    } else {
+	      // check if the process died due to an external cause. If there was the
+	      // case isstopping = false. The isstopping=true iff the Agent initiated
+	      // process stop because the process was deallocated
+	      if ( !isstopping) {
+	        //  unexpected process death. Killed by the OS or admin user with kill -9
+	        pstate = ProcessState.Failed;
+	        // fetch errors from stderr stream. If the process failed to start due to misconfiguration
+	        // the reason for failure would be provided by the OS (wrong user id, bad directory,etc) 
+	        String errors = stdErrReader.getDataFromStream();
+	        if ( errors.trim().length() > 0 ) {
+	          getDuccProcess().setReasonForStoppingProcess(errors.trim());
+	        } else {
+	          getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.Croaked.toString());
+	        }
+	      } else {
+	        // if the process was stopped due InitializationFailure or InitializationTimeout
+	        // the following method will be a noop.
+          addReasonForStopping(getDuccProcess(),ReasonForStoppingProcess.Deallocated.toString());
+	        pstate = ProcessState.Stopped;
+	      }
+	      notifyProcessObserver(pstate);
+	      log("ManagedProcess.drainProcessStreams","************ Remote Process PID:"+getPid()+" Terminated ***************");
+	    }
+		}
+	}
+	private void addReasonForStopping(IDuccProcess process, String reason) {
+	  if ( getDuccProcess().getReasonForStoppingProcess() == null || getDuccProcess().getReasonForStoppingProcess().trim().length() == 0 ) {
+	    getDuccProcess().setReasonForStoppingProcess(reason);
+	  }
+	}
+	/**
+	 * @return the state
+	 */
+	public ServiceState getStatus() {
+		return state; 
+	}
+
+	/**
+	 * @return the pid
+	 */
+	public String getPid() {
+		return duccProcess.getPID();
+	}
+	/**
+	 * @param pid the pid to set
+	 */
+	public void setPid(String pid) {
+		this.pid = pid;
+		duccProcess.setPID(pid);
+		pidReadyCount.countDown();
+	}
+	public void awaitPid() {
+		try {
+			pidReadyCount.await();
+		} catch( Exception e) {
+		}
+	}
+	public void releasePidLatch() {
+		pidReadyCount.countDown();
+	}
+	/**
+	 * @return the command
+	 */
+	public List<String> getCommand() {
+		return command;
+	}
+	/**
+	 * @param command the command to set
+	 */
+	/**
+	 * @param command the command to set
+	 */
+	public void setCommand(List<String> commandToRun) {
+		this.command = commandToRun;
+	}
+	
+	/**
+	 * @return the processCorrelationId
+	 */
+	public String getProcessId() {
+		return processCorrelationId;
+	}
+	/**
+	 * @param processCorrelationId the processCorrelationId to set
+	 */
+	public void setProcessId(String processId) {
+		this.processCorrelationId = processId;
+	}
+	/**
+	 * @return the jmxPort
+	 */
+	public String getPort() {
+		return port;
+	}
+	/**
+	 * @param jmxPort the jmxPort to set
+	 */
+	public void setPort(String port) {
+		this.port = port;
+	}
+	public void terminateRemoteProcess() throws Exception {
+		System.out.println("Stopping Remote Managed Process via JMX");
+		//	Using JMX instruct the service to terminate	
+//		if ( processMBean != null ) {
+//			processMBean.getManagedServiceMBean().terminate();
+//		}
+	}
+	public void failed() {
+//		setStatus(ServiceState.FAILED);
+		cleanup();
+//		managedProcessInfo.setState(state);
+//		if ( getEndDate() == null ) {
+//			setEndDate(new DateTime());
+//		}
+	}
+	private void cleanup() {
+		if ( process != null ) {
+			process.destroy();
+		}
+		destroyed = true;
+		
+	}
+	public void stop() {
+//	  if ( getStatus() != ServiceState.FAILED ) {
+//	    setStatus(ServiceState.STOPPED);
+//	  }
+		cleanup();
+//		if ( getEndDate() == null ) {
+//			setEndDate(new DateTime());
+//		}
+//		if ( process != null ) {
+//			process.destroy();
+//		}
+//		destroyed = true;
+	}
+	public void waitFor() throws InterruptedException {
+		process.waitFor();
+	}
+	
+    public boolean isDestroyed() {
+    	return destroyed;
+    }
+	public String getAbsoluteLogPath() {
+		return absoluteLogPath;
+	}
+	public void setAbsoluteLogPath(String absoluteLogPath) {
+		this.absoluteLogPath = absoluteLogPath;
+	}
+	public String getDescription() {
+		return description;
+	}
+	public void setDescription(String description) {
+		this.description = description;
+	}
+	 public Throwable getExceptionStackTrace() {
+	    return exceptionStackTrace;
+	  }
+	  public void setExceptionStackTrace(Throwable exceptionStackTrace) {
+	    this.exceptionStackTrace = exceptionStackTrace;
+	  }
+	/**
+	 * @return the processSpecification
+	 */
+	public ICommandLine getCommandLine() {
+		return commandLine;
+	}
+	/**
+	 * @return the duccProcess
+	 */
+	public IDuccProcess getDuccProcess() {
+		return duccProcess;
+	}
+	public void notifyProcessObserver(ProcessState state) {
+		if ( observer != null && getDuccProcess() != null ) {
+			if ( ProcessState.InitializationTimeout.equals(state)) {
+        observer.onJPInitTimeout(getDuccProcess());
+			} else {
+	      getDuccProcess().setProcessState(state);
+	      observer.onProcessExit(getDuccProcess());
+			}
+		}
+	}
+	public void startInitializationTimer() {
+		initTimer = new Timer();
+		long timeout = 7200*1000;  // default timeout after 2 hours of initialization
+		try {
+			String str_timeout;
+			if ( (str_timeout = System.getProperty("ducc.agent.launcher.process.init.timeout")) != null ) {
+				timeout = Long.parseLong(str_timeout);
+			}
+		} catch( NumberFormatException e) {
+			
+		}
+		initTimer.schedule( new InitializationTask(), timeout);  // timeout after 2 hours of initialization
+	}
+
+	/**
+	 * Stops service initialization timer and starts a new timer that will cleanup
+	 * UIMA pipeline initialization stats. These states are only needed during the
+	 * initialization of a service.
+	 */
+	public void stopInitializationTimer() {
+		if ( initTimer != null ) {
+			initTimer.cancel();
+	    long timeout = 60000;  // default timeout after 60 seconds
+			if ( getDuccProcess().getUimaPipelineComponents() != null && getDuccProcess().getUimaPipelineComponents().size() > 0) {
+			  cleanupTimer = new Timer();
+		    try {
+		      String inv_publish_rate;
+		      if ( (inv_publish_rate = System.getProperty("ducc.agent.node.inventory.publish.rate")) != null ) {
+		        timeout = Long.parseLong(inv_publish_rate)*4;  // allow sufficient time to publish init stats
+		      }
+		    } catch( NumberFormatException e) {
+		    }
+		    cleanupTimer.schedule( new CleanupTask(), timeout);  // timeout and remove UIMA pipeline stats
+			}
+		}
+	}
+	private class InitializationTask extends TimerTask {
+		public void run() {
+			initTimer.cancel();
+      notifyProcessObserver(ProcessState.InitializationTimeout);
+		}
+	}
+  private class CleanupTask extends TimerTask {
+    public void run() {
+      cleanupTimer.cancel();
+      ((DuccProcess)getDuccProcess()).setUimaPipelineComponents(null);
+    }
+  }
+
+  public IDuccStandardInfo getProcessInfo() {
+		return processInfo;
+	}
+
+	public void setProcessInfo(IDuccStandardInfo processInfo) {
+		this.processInfo = processInfo;
+	}
+
+	public boolean killAfterLaunch() {
+		return killAfterLaunch;
+	}
+
+	public void killAfterLaunch(boolean killAfterLaunch) {
+		this.killAfterLaunch = killAfterLaunch;
+	}
+
+	public Future<?> getFuture() {
+		return future;
+	}
+
+	public void setFuture(Future<?> future) {
+		this.future = future;
+	}
+
+  public long getProcessMemoryAssignment() {
+    return processMemoryAssignment;
+  }
+
+  public String getSocketEndpoint() {
+    return socketEndpoint;
+  }
+
+  public void setSocketEndpoint(String socketEndpoint) {
+    this.socketEndpoint = socketEndpoint;
+  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.Serializable;
+
+public class ManagedServiceInfo implements Serializable {
+	public enum ServiceState { STARTING, INITIALIZING, READY, FAILED, STOPPING, STOPPED, KILLED };
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	private String pid;
+	private ServiceState state;
+	/**
+	 * @return the state
+	 */
+	public ServiceState getState() {
+		return state;
+	}
+	/**
+	 * @param state the state to set
+	 */
+	public void setState(ServiceState state) {
+		this.state = state;
+	}
+	/**
+	 * @return the pid
+	 */
+	public String getPid() {
+		return pid;
+	}
+	/**
+	 * @param pid the pid to set
+	 */
+	public void setPid(String pid) {
+		this.pid = pid;
+	}
+	public String toString() {
+		return "PID:"+getPid()+" State:"+getState();
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+
+public interface Process extends Serializable {
+	public String getLogPath();
+	public void setLogPath(String logPath);
+	public String getPid();
+	public void setPid(String pid);
+	public List<String> getCommand();
+	public String getProcessId();
+	public void setProcessId(String processId);
+	public String getPort();
+	public void setPort(String port);
+	public String getNodeIp();
+	public void setNodeIp( String nodeIp);
+	public boolean isAgentProcess();
+//	public void setOSProcess(java.lang.Process target );
+	public void setLogStream(OutputStream ps);
+	public String getNodeName();
+	public void setNodeName(String nodeName);
+	public void setAgentLogPath(String logPath);
+	public String getAgentLogPath();
+	public boolean isAttached();
+	public void setAttached();
+    public void setClientId(String clientId);
+    public String getClientId();
+  public String getAbsoluteLogPath();
+  public void setAbsoluteLogPath(String absoluteLogPath);
+	public String getDescription();
+	public void setDescription(String description);
+	
+	public void setParent( String parent);
+	public String getParent();
+  public Throwable getExceptionStackTrace();
+  public void setExceptionStackTrace(Throwable exceptionStackTrace);
+  
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+
+public class ProcessStreamConsumer extends Thread {
+	public static final String LS = System.getProperty("line.separator");
+
+	private InputStream is;
+	private PrintStream os;
+	DuccLogger logger;
+	private StringBuffer errStreamBuffer = new StringBuffer();
+	public ProcessStreamConsumer(DuccLogger logger, ThreadGroup threadGroup,
+				final String threadName, InputStream is, PrintStream os) {
+		super(threadGroup, threadName);
+		this.os = os;
+		this.is = is;
+		this.logger = logger;
+		setDaemon(true);
+	}
+  public String getDataFromStream() {
+    return errStreamBuffer.toString();
+  }
+	public void run() {
+		try {
+
+			InputStreamReader in = new InputStreamReader(is);
+			BufferedReader reader = new BufferedReader(in);
+			String line;
+			while ((line = reader.readLine()) != null) {
+				line += LS;
+				if ( "StdErrorReader".equals(Thread.currentThread().getName())) {
+					logger.error("ProcessStreamConsumer.run()", null, line.trim());
+					os.print("ERR>>>"+line);
+					// Save stderr stream into a local buffer. When the process exits unexpectedly
+					// errors will be copied to a Process object.
+					errStreamBuffer.append(line.trim());  
+				} else {
+					logger.info("ProcessStreamConsumer.run()", null, line.trim());
+					os.print("OUT>>>"+line);
+				}
+			}
+		} catch (Exception x) {
+		} finally {
+			try {
+				os.flush();
+			} catch (Exception e) {
+				System.out.println("Caught Exception While Flushing "
+						+ Thread.currentThread().getName() + " Output Stream");
+			}
+			try {
+				if (is != null) {
+					is.close();
+				}
+			} catch (IOException e) {
+				// ignore
+			}
+		}
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public abstract class AbstractMetricCollector implements MetricCollector {
+  private int howManyFields;
+  protected int[] metricFieldOffsets;
+  protected int[] metricFieldLengths;
+  protected byte[] metricFileContents = new byte[1024];
+  protected RandomAccessFile metricFile;
+  private int mField;
+  private int fieldOffset;
+  private int currentOffset=0;
+
+  public AbstractMetricCollector(RandomAccessFile metricFile, int howMany, int offset) {
+    this.howManyFields = howMany;
+    this.fieldOffset = offset;
+    this.metricFile = metricFile;
+    metricFieldOffsets = new int[howMany];
+    metricFieldLengths = new int[howMany];
+  }
+  public void parseMetricFile() throws IOException {
+    metricFile.seek(0);
+    metricFile.read(metricFileContents);
+    if ( fieldOffset > 0 ) {
+      //  Advance the pointer just beyond the field name
+      while (metricFileContents[currentOffset] != ' ') {
+        ++currentOffset;
+      }
+    }
+    int currentFieldIndx=0;
+    while( currentFieldIndx++ < howManyFields ) {
+      readNextField();
+    }
+  }
+  private void readNextField() {
+ // Skip column padding.
+    while (metricFileContents[currentOffset] == ' ') {
+      ++currentOffset;
+    }
+    // Find the end of the value.
+    int offset = currentOffset;
+    while (metricFileContents[currentOffset] != ' ' && metricFileContents[currentOffset] != 0) {
+      ++currentOffset;
+    }
+    // Store the value's offset and length.
+    metricFieldOffsets[mField] = offset;
+    metricFieldLengths[mField++] = currentOffset - offset;
+    // Skip to the next value.
+    currentOffset += fieldOffset;
+  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native