You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:02:19 UTC
svn commit: r1427906 [3/5] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-agent: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/agent/
main/java/org/apache/u...
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.event;
+
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.camel.Body;
+import org.apache.uima.ducc.agent.Agent;
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.ProcessLifecycleController;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.apache.uima.ducc.transport.event.DuccJobsStateEvent;
+import org.apache.uima.ducc.transport.event.ProcessPurgeDuccEvent;
+import org.apache.uima.ducc.transport.event.ProcessStartDuccEvent;
+import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccUserReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccJobDeployment;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+
+@Qualifier(value = "ael")
+
+public class AgentEventListener implements DuccEventDelegateListener {
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(), Agent.COMPONENT_NAME);
+ ProcessLifecycleController lifecycleController = null;
+
+ private NodeAgent agent;
+ public AgentEventListener(NodeAgent agent, ProcessLifecycleController lifecycleController) {
+ this.agent = agent;
+ this.lifecycleController = lifecycleController;
+ }
+ public AgentEventListener(NodeAgent agent) {
+ this.agent = agent;
+ }
+ public void setDuccEventDispatcher( DuccEventDispatcher eventDispatcher ) {
+ //this.eventDispatcher = eventDispatcher;
+ }
+ private void reportIncomingStateForThisNode(DuccJobsStateEvent duccEvent) throws Exception {
+ StringBuffer sb = new StringBuffer();
+
+ for( IDuccJobDeployment jobDeployment : duccEvent.getJobList()) {
+ if ( isTargetNodeForProcess(jobDeployment.getJdProcess()) ) {
+ IDuccProcess process = jobDeployment.getJdProcess();
+ sb.append("\nJD--> JobId:"+jobDeployment.getJobId()+" ProcessId:"+process.getDuccId()+" PID:"+process.getPID()+" Status:"+process.getProcessState() + " Resource State:"+process.getResourceState()+" isDeallocated:"+process.isDeallocated());
+ }
+ for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
+ if ( isTargetNodeForProcess(process) ) {
+ sb.append("\n\tJob ID:"+jobDeployment.getJobId()+" ProcessId:"+process.getDuccId()+" PID:"+process.getPID()+" Status:"+process.getProcessState() + " Resource State:"+process.getResourceState()+" isDeallocated:"+process.isDeallocated());
+ }
+ }
+ }
+ logger.info("reportIncomingStateForThisNode",null,sb.toString());
+ }
+ /**
+ * This method is called by Camel when PM sends DUCC state to agent's queue. It
+ * takes responsibility of reconciling processes on this node.
+ *
+ * @param duccEvent - contains list of current DUCC jobs.
+ * @throws Exception
+ */
+ public void onDuccJobsStateEvent(@Body DuccJobsStateEvent duccEvent) throws Exception {
+ // typically lifecycleController is null and the agent assumes the role. For jUnit testing though,
+ // a different lifecycleController is injected to facilitate black box testing
+ if ( lifecycleController == null ) {
+ lifecycleController = agent;
+ }
+ // Recv'd ducc state update, restart process reaper task which detects missing
+ // OR state due to a network problem.
+// try {
+// agent.restartProcessReaperTimer();
+// } catch( Exception e) {
+// logger.error("onDuccJobsStateEvent", null, "", e);
+// }
+
+ try {
+ // print JP report targeted for this node
+ reportIncomingStateForThisNode(duccEvent);
+
+ List<DuccUserReservation> reservations =
+ duccEvent.getUserReservations();
+ agent.setReservations(reservations);
+ // Stop any process that is in this Agent's inventory but not associated with any
+ // of the jobs we just received
+ agent.takeDownProcessWithNoJob(agent,duccEvent.getJobList());
+ // iterate over all jobs and reconcile those processes that are assigned to this agent. First,
+ // look at the job's JD process and than JPs.
+ for( IDuccJobDeployment jobDeployment : duccEvent.getJobList()) {
+ // check if this node is a target for this job's JD
+ if ( isTargetNodeForProcess(jobDeployment.getJdProcess()) ) {
+ // agent will check the state of JD process and either start, stop, or take no action
+ ICommandLine jdCommandLine = jobDeployment.getJdCmdLine();
+ if(jdCommandLine != null) {
+ agent.reconcileProcessStateAndTakeAction(lifecycleController, jobDeployment.getJdProcess(), jobDeployment.getJdCmdLine(),
+ jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
+ }
+ else {
+ logger.error("onDuccJobsStateEvent", null, "job is service");
+ }
+ }
+ // check JPs
+ for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
+ if ( isTargetNodeForProcess(process) ) {
+ // agent will check the state of JP process and either start, stop, or take no action
+ agent.reconcileProcessStateAndTakeAction(lifecycleController, process, jobDeployment.getJpCmdLine(),
+ jobDeployment.getStandardInfo(), jobDeployment.getProcessMemoryAssignment(), jobDeployment.getJobId());
+ }
+ }
+ }
+ } catch( Exception e ) {
+ logger.error("onDuccJobsStateEvent", null, e);
+ }
+ }
+ /**
+ * Wrapper method for Utils.isTargetNodeForMessage()
+ *
+ * @param process
+ * @return
+ * @throws Exception
+ */
+ private boolean isTargetNodeForProcess( IDuccProcess process ) throws Exception {
+ boolean retVal = false;
+ if(process != null) {
+ retVal = Utils.isTargetNodeForMessage(process.getNodeIdentity().getIp(),agent.getIdentity().getIp());
+ }
+ return retVal;
+ }
+ public void onProcessStartEvent(@Body ProcessStartDuccEvent duccEvent) throws Exception {
+ // iterate given ProcessMap and start a Process if this node is a target
+ for( Entry<DuccId, IDuccProcess> processEntry : duccEvent.getProcessMap().entrySet()) {
+ // check if this Process should be launched on this node. A process map
+ // may contain processes not meant to run on this node. Each Process instance
+ // in a Map has a node assignment. Only if this assignment matches this node
+ // the agent will start a process.
+ if ( Utils.isTargetNodeForMessage(processEntry.getValue().getNodeIdentity().getIp(),agent.getIdentity().getIp()) ) {
+ logger.info(">>> onProcessStartEvent", null,"... Agent ["+agent.getIdentity().getIp()+"] Matches Target Node Assignment:"+processEntry.getValue().getNodeIdentity().getIp()+" For Share Id:"+ processEntry.getValue().getDuccId());
+ agent.doStartProcess(processEntry.getValue(),duccEvent.getCommandLine(), duccEvent.getStandardInfo(), duccEvent.getDuccWorkId());
+ if ( processEntry.getValue().getProcessType().equals(ProcessType.Pop)) {
+ break; // there should only be one JD process to launch
+ } else {
+ continue;
+ }
+ }
+ }
+ }
+ public void onProcessStopEvent(@Body ProcessStopDuccEvent duccEvent) throws Exception {
+ for( Entry<DuccId, IDuccProcess> processEntry : duccEvent.getProcessMap().entrySet()) {
+ if ( Utils.isTargetNodeForMessage(processEntry.getValue().getNodeIdentity().getIp(), agent.getIdentity().getIp()) ) {
+ logger.info(">>> onProcessStopEvent", null,"... Agent Received StopProces Event - Process Ducc Id:"+processEntry.getValue().getDuccId()+" PID:"+processEntry.getValue().getPID());
+ agent.doStopProcess(processEntry.getValue());
+ }
+ }
+ }
+ public void onProcessStateUpdate(@Body ProcessStateUpdateDuccEvent duccEvent) throws Exception {
+ logger.info(">>> onProcessStateUpdate", null,"... Agent Received ProcessStateUpdateDuccEvent - Process State:"+duccEvent.getState()+" Process ID:"+duccEvent.getDuccProcessId());
+// agent.updateProcessStatus(duccEvent.getDuccProcessId(), duccEvent.getPid(), duccEvent.getState());
+ agent.updateProcessStatus(duccEvent);
+ }
+ public void onProcessPurgeEvent(@Body ProcessPurgeDuccEvent duccEvent) throws Exception {
+ logger.info(">>> onProcessPurgeEvent", null,"... Agent Received ProcessPurgeDuccEvent -"+" Process ID:"+duccEvent.getProcess().getPID());
+ agent.purgeProcess(duccEvent.getProcess());
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.event;
+
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+
+public interface ProcessLifecycleObserver {
+ public void onProcessExit(IDuccProcess process);
+ public void onJPInitTimeout(IDuccProcess process);
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/event/ProcessLifecycleObserver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.exceptions;
+
+public class DuccNodeException extends Exception {
+
+ private static final long serialVersionUID = -5548490125536576177L;
+ public DuccNodeException(String msg) {
+ super(msg);
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.exceptions;
+
+public class DuccNodeInitializationException extends DuccNodeException {
+
+ private static final long serialVersionUID = -2742893598112795349L;
+ public DuccNodeInitializationException(String msg) {
+ super(msg);
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/exceptions/DuccNodeInitializationException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+public abstract class CommandExecutor implements Callable<Process> {
+ protected Process managedProcess = null;
+ protected String host;
+ protected String ip;
+ protected ICommandLine cmdLine;
+ protected NodeAgent agent;
+ public abstract void stop() throws Exception;
+
+ public abstract Process exec(ICommandLine commandLine,
+ Map<String, String> processEnv) throws Exception;
+
+ public CommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host, String ip, Process managedProcess)
+ throws Exception {
+ this.agent = agent;
+ this.host = host;
+ this.ip = ip;
+ this.managedProcess = managedProcess;
+ this.cmdLine = cmdLine;
+ }
+ /**
+ * Called after process is launched. Assign PID, state and finally drain
+ * process streams.
+ *
+ * @param process - launched process
+ */
+ protected void postExecStep(java.lang.Process process, DuccLogger logger, boolean isKillCmd) {
+ if ( !isKillCmd ) {
+ int pid = Utils.getPID(process);
+ if ( pid != -1 ) {
+ ((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
+ if ( !((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Job_Uima_AS_Process) ) {
+ // For non uima as processes assume the process is Running after launch
+ if ( !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Stopped)) {
+ ((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Running);
+ }
+ }
+ }
+ }
+
+ // Drain process streams in dedicated threads.
+ ((ManagedProcess) managedProcess).drainProcessStreams(process, logger, System.out, isKillCmd);
+ }
+ /**
+ * Called by Executor to exec a process.
+ */
+ public Process call() throws Exception {
+ Process deployedProcess = null;
+ try {
+ //ICommandLine commandLine = ((ManagedProcess) managedProcess).getCommandLine();
+ Map<String, String> env = new HashMap<String, String>();
+ if ( !isKillCommand(cmdLine)) {
+ // Enrich environment for the new process. Via these settings the UIMA AS
+ // service wrapper can notify the agent of its state.
+ env.put("IP", ip);
+ env.put("NodeName", host);
+ // Add process unique ducc id to correlate process state updates
+ env.put("ProcessDuccId", ((ManagedProcess) managedProcess).getDuccId().getUnique());
+ if ( ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Job_Uima_AS_Process)) {
+ ((ManagedProcess) managedProcess).startInitializationTimer();
+ }
+ }
+ deployedProcess = exec(cmdLine, env);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if ( ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Job_Uima_AS_Process)) {
+ ((ManagedProcess) managedProcess).stopInitializationTimer();
+ }
+ }
+ return deployedProcess;
+ }
+ protected boolean isKillCommand(ICommandLine cmdLine) {
+ return ( cmdLine.getExecutable() != null &&
+ ( cmdLine.getExecutable().startsWith("/bin/kill") ||
+ cmdLine.getExecutable().startsWith("taskkill") ) );
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/CommandExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.DuccExchange;
+import org.apache.uima.ducc.transport.cmdline.ACommandLine;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
+import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.TimeWindow;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+public class DuccCommandExecutor extends CommandExecutor {
+ DuccLogger logger = DuccLogger.getLogger(this.getClass(),
+ NodeAgent.COMPONENT_NAME);
+ @SuppressWarnings("unused")
+ private static AtomicInteger nextPort = new AtomicInteger(30000);
+
+ public DuccCommandExecutor(NodeAgent agent, ICommandLine cmdLine,String host, String ip, Process managedProcess)
+ throws Exception {
+ super(agent, cmdLine, host, ip, managedProcess);
+ }
+ public DuccCommandExecutor(ICommandLine cmdLine,String host, String ip, Process managedProcess)
+ throws Exception {
+ super(null, cmdLine, host, ip, managedProcess);
+ }
+
+ private boolean useDuccSpawn() {
+ if ( super.managedProcess.isAgentProcess() || Utils.isWindows() ) {
+ return false;
+ }
+ // On non-windows check if we should spawn the process via ducc_ling
+ String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
+ if ( useSpawn != null && useSpawn.toLowerCase().equals("true")) {
+ return true;
+ }
+ // default
+ return false;
+ }
+
+ public Process exec(ICommandLine cmdLine, Map<String, String> processEnv)
+ throws Exception {
+ String methodName = "exec";
+
+// DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId();
+ try {
+ String[] cmd = getDeployableCommandLine(cmdLine,processEnv);
+ if ( isKillCommand(cmdLine) ) {
+ logger.info(methodName, null, "Killing process");
+ stopProcess(cmdLine, cmd);
+ } else {
+ startProcess(cmdLine, cmd, processEnv);
+ }
+ return managedProcess;
+ } catch (Exception e) {
+ if ( ((ManagedProcess)super.managedProcess).getDuccProcess() != null ) {
+ DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId();
+ logger.error(methodName, duccId, ((ManagedProcess)super.managedProcess).getDuccProcess().getDuccId(), e, new Object[]{});
+ }
+ throw e;
+ }
+ }
+ private boolean isProcessTerminated( IDuccProcess process ) {
+ return process.getProcessState().equals(ProcessState.Stopped) ||
+ process.getProcessState().equals(ProcessState.Failed) ||
+ process.getProcessState().equals(ProcessState.FailedInitialization);
+ }
+ private void stopProcess(ICommandLine cmdLine, String[] cmd ) throws Exception {
+ String methodName = "stopProcess";
+ Future<?> future = ((ManagedProcess) managedProcess).getFuture();
+ if ( future == null ) {
+ throw new Exception("Future Object not Found. Unable to Stop Process with PID:"+((ManagedProcess) managedProcess).getPid());
+ }
+ // for stop to work, PID must be provided
+ if (((ManagedProcess) managedProcess).getDuccProcess().getPID() == null ||
+ ((ManagedProcess) managedProcess).getDuccProcess().getPID().trim().length() == 0 ) {
+ throw new Exception("Process Stop Command Failed. PID not provided.");
+ }
+ long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+ if ( super.agent.configurationFactory.processStopTimeout != null ) {
+ maxTimeToWaitForProcessToStop = Long.valueOf(super.agent.configurationFactory.processStopTimeout);
+ }
+ try {
+ // if the process is marked for death or still initializing or it is JD, kill it
+ if ( ((ManagedProcess) managedProcess).doKill() ||
+ ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop) ||
+ ((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Initializing) ||
+ ((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Starting) ||
+ ((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.FailedInitialization)) {
+ logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),">>>>>>>>>>>>>>> Killing Process:"+((ManagedProcess) managedProcess).getPid()+" .Process State:Initializing");
+ doExec(new ProcessBuilder(cmd), cmd, true);
+ } else { // send stop request to quiesce the service
+ Map<String, Object> msgHeader = new HashMap<String, Object>();
+ // Add PID to the header. Receiving process has a PID filter in its router
+ // to determine if the stop message is destined for it.
+ msgHeader.put(DuccExchange.ProcessPID, ((ManagedProcess) managedProcess).getPid());
+ msgHeader.put(DuccExchange.DUCCNODEIP, ((ManagedProcess) managedProcess).getDuccProcess().getNodeIdentity().getIp());
+ logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"Agent Sending Stop request to remote process with PID:"+((ManagedProcess) managedProcess).getPid()+ " On Node:"+((ManagedProcess) managedProcess).getDuccProcess().getNodeIdentity().getIp());
+ try {
+ // Use either Mina (socket) or jms to communicate with an agent
+ if (((ManagedProcess) managedProcess).getSocketEndpoint() != null ) {
+
+ logger.info(methodName,null,"Agent Sending <<STOP>> to Endpoint:"+((ManagedProcess) managedProcess).getSocketEndpoint());
+ // socket transport
+ super.
+ agent.
+ getEventDispatcherForRemoteProcess().
+ dispatch( new ProcessStopDuccEvent(new HashMap<DuccId, IDuccProcess>()),
+ ((ManagedProcess) managedProcess).getSocketEndpoint(), msgHeader );
+
+ } else {
+ // jms transport
+ super.
+ agent.
+ getEventDispatcherForRemoteProcess().
+ dispatch(new ProcessStopDuccEvent(new HashMap<DuccId, IDuccProcess>()),msgHeader );
+ }
+ } catch( Exception ex) {
+ logger.error(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), ex, new Object[]{});
+ } finally {
+ logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Dispatched STOP Request to Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" Waiting for Process to Stop. Timout Value:"+maxTimeToWaitForProcessToStop+" millis");
+ try {
+ // Start Kill timer only if process is still running
+ if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Running)) {
+ // the following call will block!!! Waits for process to stop on its own. If it doesnt stop, kills it hard.
+ // The exact time the service is allotted for stopping is defined in ducc.properties
+ logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Starting Timer For Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
+ future.get(maxTimeToWaitForProcessToStop, TimeUnit.MILLISECONDS);
+ } else {
+ logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Dispatched STOP Request to Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process Not In Running State");
+ }
+ } catch (TimeoutException tex) { // on time out kill the process
+ if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Running)) {
+ logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Timed-out Waiting for Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" to Stop. Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process did not stop in alloted time of "+maxTimeToWaitForProcessToStop+" millis");
+ logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),">>>>>>>>>>>>>>> Killing Process:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" .Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
+ doExec(new ProcessBuilder(cmd), cmd, true);
+ } else {
+ logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Timed-out Waiting for Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" to Stop but the process is not in a running state. Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
+ }
+ }
+ }
+ }
+ } catch( Exception e) { // InterruptedException, ExecutionException
+ logger.error(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), e, new Object[]{});
+ }
+ }
+
+ private void startProcess(ICommandLine cmdLine,String[] cmd, Map<String, String> processEnv) throws Exception {
+ String methodName = "startProcess";
+
+ ITimeWindow twr = new TimeWindow();
+ String millis;
+ millis = TimeStamp.getCurrentMillis();
+
+ ((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowRun(twr);
+ twr.setStart(millis);
+ ProcessBuilder pb = new ProcessBuilder(cmd);
+
+ if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop)) {
+ ITimeWindow twi = new TimeWindow();
+ ((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowInit(twi);
+ twi.setStart(millis);
+ twi.setEnd(millis);
+ }
+ String workingDir = null;
+ // Set working directory if a user specified it in a job specification
+ //if ( ((ManagedProcess)super.managedProcess).getProcessInfo() != null ) {
+ // workingDir = ((ManagedProcess)super.managedProcess).getProcessInfo().getWorkingDirectory();
+ //}
+ //if ( workingDir != null ) {
+ // logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), "Launching process in a user provided working dir:"+workingDir);
+ // pb.directory(new File(workingDir));
+ //}
+ Map<String, String> env = pb.environment();
+ // enrich Process environment
+ env.putAll(processEnv);
+ if( cmdLine instanceof ACommandLine ) {
+ // enrich Process environment with one from a given command line
+ env.putAll(((ACommandLine)cmdLine).getEnvironment());
+ }
+ if ( logger.isTrace()) {
+ // <dump>
+ Iterator<String> iterator = env.keySet().iterator();
+ while(iterator.hasNext()) {
+ String key = iterator.next();
+ String value = env.get(key);
+ String message = "key:"+key+" "+"value:"+value;
+ logger.trace(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), message);
+ }
+ }
+ try {
+ doExec(pb, cmd, isKillCommand(cmdLine));
+ } catch(Exception e) {
+ throw e;
+ } finally {
+ millis = TimeStamp.getCurrentMillis();
+ twr.setEnd(millis);
+ }
+ }
+ private void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd) throws Exception {
+ String methodName = "doExec";
+
+ try {
+
+ StringBuilder sb = new StringBuilder((isKillCommand(cmdLine) ?"--->Killing Process ":"---> Launching Process:")
+ + " Using command line:");
+ int inx=0;
+ for (String cmdPart : cmd) {
+ sb.append("\n\t[").append(inx++).append("]").append(Utils.resolvePlaceholderIfExists(cmdPart, System.getProperties()));
+ }
+ logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), sb.toString());
+
+ java.lang.Process process = pb.start();
+ // Drain process streams
+ postExecStep(process, logger, isKillCmd);
+ // block waiting for the process to terminate.
+ process.waitFor();
+ if ( !isKillCommand(cmdLine) ) {
+ logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), ">>>>>>>>>>>>> Process with PID:"+((ManagedProcess)super.managedProcess).getDuccProcess().getPID()+" Terminated");
+ }
+ } catch( NullPointerException ex) {
+ ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
+ StringBuffer sb = new StringBuffer();
+ sb.setLength(0);
+ sb.append("\n\tJava ProcessBuilder Failed to Launch Process due to NullPointerException. An Entry in the Command Array Must be Null. Look at Command Array Below:\n");
+ for (String cmdPart : cmd) {
+ if ( cmdPart != null ) {
+ sb.append("\n\t").append(cmdPart);
+ }
+ }
+ logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), sb.toString());
+ ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
+ throw ex;
+ } catch( Exception ex) {
+ ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
+ throw ex;
+ } finally {
+ // Per team discussion on Aug 31 2011, the process is stopped by an agent when initialization
+ // times out or initialization failed. Both Initialization_Timeout and FailedIntialization imply
+ // that the process is stopped.
+ if ( !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.InitializationTimeout) &&
+ !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.FailedInitialization) &&
+ !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Failed) &&
+ !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Killed)) {
+ ((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Stopped);
+ }
+ }
+
+ }
+
+ private String[] getDeployableCommandLine(ICommandLine cmdLine, Map<String, String> processEnv) throws Exception {
+ String methodName = "getDeployableCommandLine";
+ String[] cmd = new String[0];
+
+ try {
+ // lock using Agent single permit semaphore. The Utils.concatAllArrays()
+ // uses native call (for efficiency) which appears not thread safe.
+ NodeAgent.lock();
+ // Use ducc_ling (c code) as a launcher for the actual process. The ducc_ling
+ // allows the process to run as a specified user in order to write out logs in
+ // user's space as oppose to ducc space.
+ String c_launcher_path =
+ Utils.resolvePlaceholderIfExists(
+ System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());
+
+ // if the command line is kill, don't provide any logging info to the ducc_ling. Otherwise,
+ // ducc_ling creates and empty log for each time we are killing a process
+ if ( isKillCommand(cmdLine) ) {
+ // Duccling, with no logging, always run by ducc, no need for workingdir
+ String[] duccling_nolog = new String[] { c_launcher_path,
+ "-u", ((ManagedProcess)super.managedProcess).getOwner(),
+ "--" };
+ if ( useDuccSpawn() ) {
+ cmd = Utils.concatAllArrays(duccling_nolog, new String[] {cmdLine.getExecutable()},cmdLine.getCommandLine());
+ } else {
+ cmd = Utils.concatAllArrays(new String[] {cmdLine.getExecutable()},cmdLine.getCommandLine());
+ }
+ } else {
+ String processType = "-UIMA-";
+ if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop)) {
+ processType = "-JD-";
+ }
+ String processLogDir = ((ManagedProcess)super.managedProcess).getProcessInfo().getLogDirectory()+
+ (((ManagedProcess)super.managedProcess).getProcessInfo().getLogDirectory().endsWith(File.separator) ? "" : File.separator)+
+ ((ManagedProcess)super.managedProcess).getWorkDuccId()+File.separator;
+ String processLogFile = ((ManagedProcess)super.managedProcess).getWorkDuccId()+ processType+host;
+ String workingDir = ((ManagedProcess)super.managedProcess).getProcessInfo().getWorkingDirectory();
+ if ( workingDir == null ) {
+ workingDir = "NONE";
+ }
+
+ // Duccling, with logging
+ String[] duccling = new String[] { c_launcher_path,
+ "-f", processLogDir+processLogFile,
+ "-w", workingDir,
+ "-u", ((ManagedProcess)super.managedProcess).getOwner(),
+ "--" };
+ // For now, log to user's home directory
+ // String baseDir =
+ // ((ManagedProcess)super.managedProcess).getProcessInfo().getLogDirectory();
+
+ String executable = cmdLine.getExecutable();
+ // Check if user specified which java to use to launch the process. If not provided,
+ // use the same java that the agent is running with
+ if (executable == null || executable.trim().length() == 0 ) {
+ executable = System.getProperty("java.home")+File.separator+"bin"+File.separator+"java";
+ }
+ List<String> operationalProperties = new ArrayList<String>();
+
+ if ( cmdLine instanceof JavaCommandLine ) {
+ String duccHomePath = null;
+ if ( (duccHomePath = System.getenv("DUCC_HOME")) == null ) {
+ duccHomePath = System.getProperty("DUCC_HOME");
+ }
+ operationalProperties.add("-DDUCC_HOME="+duccHomePath);
+ operationalProperties.add("-Dducc.deploy.configuration="+
+ System.getProperty("ducc.deploy.configuration"));
+ if ( System.getProperties().containsKey("ducc.agent.managed.process.state.update.endpoint.type") ) {
+ String type = System.getProperty("ducc.agent.managed.process.state.update.endpoint.type");
+ if (type != null && type.equalsIgnoreCase("socket")) {
+ operationalProperties.add("-D"+NodeAgent.ProcessStateUpdatePort+"="+System.getProperty(NodeAgent.ProcessStateUpdatePort));
+ }
+ }
+ operationalProperties.add("-Dducc.process.log.dir="+processLogDir);
+ operationalProperties.add("-Dducc.process.log.basename="+processLogFile); //((ManagedProcess)super.managedProcess).getWorkDuccId()+ processType+host);
+ operationalProperties.add("-Dducc.job.id="+((ManagedProcess)super.managedProcess).getWorkDuccId());
+
+ }
+ String[] operationalPropertiesArray = new String[operationalProperties.size()];
+
+ if ( useDuccSpawn() ) {
+ cmd = Utils.concatAllArrays(duccling, new String[] {executable}, operationalProperties.toArray(operationalPropertiesArray), cmdLine.getCommandLine());
+ } else {
+ cmd = Utils.concatAllArrays(new String[] {executable}, operationalProperties.toArray(operationalPropertiesArray), cmdLine.getCommandLine());
+ }
+ // add JobId to the env
+ if ( processEnv != null ) {
+ processEnv.put("JobId", String.valueOf(((ManagedProcess)super.managedProcess).getWorkDuccId().getFriendly()));
+ // for now just use user.home. In the long run the LogDir may
+ // come from job spec
+ }
+ }
+ return cmd;
+ } catch( Exception ex) {
+ ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
+ throw ex;
+ } finally {
+ NodeAgent.unlock();
+ }
+
+ }
+ public void stop() {
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * Parses jvm args String provided by the user and returns a List of jvm args.
+ * The parser only expects -X and -D in the jvm args String. Supports a mix
+ * of -X and -D args in an arbitrary order.
+ *
+ */
+public class JvmArgsParser {
+
+ /**
+ * Parse -X args
+ *
+ * @param cmd2run - where to add -X args
+ * @param arg - string containing zero or more -X args
+ */
+ private static void addJvmOptions(List<String> cmd2run, String arg) {
+ if ( arg == null || arg.trim().length() == 0 ) {
+ return;
+ }
+ boolean ignoreFirstArg = false;
+ // the arg string may start with non -X arg, in which case ignore
+ // that first part.
+ if ( !arg.trim().startsWith("-")) {
+ ignoreFirstArg = true;
+ }
+ // Get -X tokens
+ String[] jvm_args = arg.split("-X");
+ for( String jvm_arg : jvm_args ) {
+ // check if the first token contains non -X part, in which case ignore it
+ if ( ignoreFirstArg ) {
+ ignoreFirstArg = false;
+ continue;
+ }
+ // check if this is valid -X token and add it to a given list
+ if ( jvm_arg.trim().length() > 0 ) {
+ cmd2run.add("-X"+jvm_arg.trim()); // -X was stripped by split()
+ }
+ }
+ }
+ /**
+ * Return a List of jvm args contained in a provided args string.
+ * It first tokenizes the args string extracting -D args. Secondly,
+ * it parses each token and extracts -X args.
+ *
+ * @param args - string containing a mix of -X and -D args
+ *
+ * @return
+ */
+ public static List<String> parse(String args) {
+ List<String> jvm_args = new ArrayList<String>();
+ // tokenize on -D boundaries. Produced tokens may contain both -D and -X args
+ String[] jvm_args_array = args.split("-D");
+ for (String arg : jvm_args_array) {
+ if (arg.trim().length() > 0) {
+ int start = 0;
+ arg = arg.trim();
+ // check if the current token contains -X arg
+ if ((start = arg.indexOf("-X")) > -1) {
+ // parse current token and add -X args to the list
+ addJvmOptions(jvm_args, arg);
+ } else {
+ jvm_args.add("-D" + arg); // -D was stripped by split()
+ continue;
+ }
+ if (start == 0) {
+ continue;
+ }
+ jvm_args.add("-D" + arg.substring(0, start).trim()); // -D was stripped by split()
+ }
+ }
+ return jvm_args;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/JvmArgsParser.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.net.InetAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
+import org.apache.uima.ducc.transport.event.common.DuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+
+
+public class Launcher {
+ private ExecutorService executorService = null;
+ private DuccIdFactory duccIdFactory = new DuccIdFactory();
+ public Launcher() {
+ executorService = Executors.newCachedThreadPool();
+ }
+ /**
+ * This method is used to launch multiple Agents on the same physical machine. It allows
+ * to scale up Agents on a single machine to simulate load. Each Agent instance will
+ * assume a given name and IP address.
+ *
+ * @param cmdLine - defines the command line used to exec Agent process
+ * @param howMany - how many Agents to exec
+ * @param ip - initial IP address to assign to first Agent instance. Subsequent Agent instances
+ * will have this IP incremented as in XXX.XXX.XXX.XX+1,XXX.XXX.XXX.XX+2, etc
+ * @param nodeName - name of the physical machine. The Agents will be assigned this name plus
+ * a suffix as in watson-1, watson-2, etc
+ * @throws Exception
+ */
+ public void start(ICommandLine cmdLine, int howMany, String ip, String nodeName) throws Exception {
+ // Launch as many agents as requested
+ for( int i=0; i < howMany; i++ ) {
+ String host = new String(nodeName);
+ // Append suffix to node name for each Agent instance
+ if ( host.indexOf(".") > -1 ) {
+ String tmp = host.substring(0,host.indexOf("."));
+ host = tmp+"-"+String.valueOf(i+1)+host.substring(host.indexOf("."));
+ } else {
+ host = host+"-"+String.valueOf(i+1);
+ }
+ launchProcess(host, ip, cmdLine);
+ // Increment IP address for the next Agent
+ int uip = Integer.parseInt(ip.substring(ip.lastIndexOf(".")+1, ip.length()));
+ ip = ip.substring(0,ip.lastIndexOf(".")+1).concat(String.valueOf(uip+1));
+ }
+
+ }
+
+ /**
+ * Submit request to exec a process. The process will be exec'd in a separate thread.
+ *
+ * @param process
+ * @param commandLine
+
+ * @throws Exception
+ */
+ public ManagedProcess launchProcess(NodeAgent agent, NodeIdentity nodeIdentity,IDuccProcess process, ICommandLine commandLine,ProcessLifecycleObserver observer, ManagedProcess managedProcess)
+ throws Exception {
+ // Instantiate executor that will actually exec the process using java's ProcessBuilder
+ DuccCommandExecutor executor =
+ new DuccCommandExecutor(agent, commandLine, nodeIdentity.getName(),nodeIdentity.getIp(), managedProcess);
+ Future<?> future = executorService.submit(executor);
+ // if we are launching a process, save the future object returned from Executor above
+ managedProcess.setFuture(future);
+ return managedProcess;
+ }
+ /**
+ * This method is used to testing only. It enables launching an agent with
+ * a given name and IP address which are different from a physical node name
+ * and IP address. With that multiple agents can be launched on the same
+ * physical machine simulating a cluster of nodes.
+ *
+ */
+ public void launchProcess(String host, String ip, ICommandLine cmdLine) throws Exception {
+ IDuccProcess process =
+ new DuccProcess(duccIdFactory.next(), new NodeIdentity(ip, host));
+ process.setProcessType(ProcessType.Pop);
+ ManagedProcess managedProcess = new ManagedProcess(process, cmdLine, true);
+ DuccCommandExecutor executor =
+ new DuccCommandExecutor(cmdLine, host, ip, managedProcess);
+ executorService.submit(executor);
+ }
+ public static void main(String[] args) {
+ try {
+ int howMany = Integer.parseInt(args[0]); // how many agent processes to launch
+ String ip = System.getProperty("IP");
+ String nodeName = InetAddress.getLocalHost().getHostName();
+ Launcher launcher = new Launcher();
+ JavaCommandLine cmdLine = new JavaCommandLine("java");
+ String duccHome = System.getenv("DUCC_HOME");
+ cmdLine.addOption("-Dducc.deploy.configuration="+duccHome+"/resources/ducc.properties");
+ cmdLine.addOption("-Dducc.deploy.components=agent");
+ cmdLine.addOption("-Djava.library.path=" + duccHome +"/lib/sigar");
+ cmdLine.addOption("-DDUCC_HOME=" + duccHome);
+
+ // System.out.println("Spawning with classpath: \n" + System.getProperty("java.class.path"));
+ // cmdLine.setClasspath(duccHome+"/lib/*:" +
+ // duccHome+"/lib/apache-activemq-5.5.0/*:" +
+ // duccHome+"/lib/apache-camel-2.7.1/*:" +
+ // duccHome+"/lib/commons-collections-3.2.1/*:" +
+ // duccHome+"/lib/apache-commons-lang-2.6/*:" +
+ // duccHome+"/lib/apache-log4j-1.2.16/*:" +
+ // duccHome+"/lib/guava-r09/*:" +
+ // duccHome+"/lib/joda-time-1.6/*:" +
+ // duccHome+"/lib/sigar/*:" +
+ // duccHome+"/lib/springframework-3.0.5/*:");
+
+ cmdLine.setClasspath(System.getProperty("java.class.path"));
+ cmdLine.setClassName("org.apache.uima.ducc.agent.common.main.DuccService");
+ launcher.start(cmdLine, howMany, ip, nodeName);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Launcher.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
+import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
+import org.apache.uima.ducc.agent.launcher.ManagedServiceInfo.ServiceState;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.event.common.DuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+
+
+public class ManagedProcess implements Process {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private static final String JMXPortProperty = "com.sun.management.jmxremote.port";
+ /*
+ * Process States
+ * +------- STARTING
+ * / |
+ * / |
+ * V V
+ * INITIALIZING ------> FAILED
+ * | /
+ * | /
+ * V /
+ * READY ------>
+ * |
+ * |
+ * V
+ * STOPPED
+ */
+ private ServiceState state= ServiceState.STARTING;
+ public static enum ProcessType {
+ UIMA_SERVICE{},
+ APPLICATION{},
+ LAUNCHER{};
+ };
+
+
+ public static enum StopPriority {
+ KILL_9, QUIESCE_AND_STOP
+ }
+ private volatile boolean destroyed=false;
+ private IDuccProcess duccProcess;
+ // Used to kill a process after it reports its PID.
+ private volatile boolean killAfterLaunch=false;
+ private String pid;
+ private String processCorrelationId;
+ // JMX Port
+ private String port = null;
+ // Service socket endpoint
+ private String socketEndpoint = null;
+
+ private int tgCounter=1;
+ private String owner;
+ // log path for http access
+ private String logPath;
+ // absolute path to the process log
+ private String absoluteLogPath;
+
+ private String agentLogPath;
+ private boolean agentProcess = false;
+ private String nodeIp;
+ private String nodeName;
+ private String description;
+ private String parent;
+
+ private ProcessType processType;
+ private volatile boolean attached;
+ private String clientId;
+// private volatile boolean killed;
+ private Throwable exceptionStackTrace;
+ // The following are transients to prevent serialization when sending heartbeat updates
+ // to the controller. This class exports its state via AgentManagedProcess interface.
+ private transient List<String> command;
+ private transient java.lang.Process process;
+
+ protected transient ProcessStreamConsumer stdOutReader;
+ protected transient ProcessStreamConsumer stdErrReader;
+ private transient OutputStream processLogStream = null;
+ private transient CountDownLatch pidReadyCount = new CountDownLatch(1);
+
+ private ICommandLine commandLine;
+
+ private ProcessLifecycleObserver observer;
+
+ private Timer initTimer;
+
+ // timer used to cleanup UIMA pipeline initializations stats
+ private Timer cleanupTimer;
+
+ private DuccId workDuccId;
+
+ private IDuccStandardInfo processInfo;
+
+ private transient Future<?> future;
+
+ private transient volatile boolean isstopping;
+
+ private transient volatile boolean forceKill;
+
+ private transient DuccLogger logger;
+
+ private long processMemoryAssignment;
+
+ public ManagedProcess(IDuccProcess process, ICommandLine commandLine) {
+ this(process, commandLine, null, null,0);
+ }
+
+ public ManagedProcess(IDuccProcess process, ICommandLine commandLine,boolean agentProcess) {
+ this(process, commandLine, null, null,0);
+ this.agentProcess = agentProcess;
+ }
+
+ public ManagedProcess(IDuccProcess process, ICommandLine commandLine,ProcessLifecycleObserver observer, DuccLogger logger, long processMemoryAssignment) {
+ this.commandLine = commandLine;
+ this.duccProcess = process;
+ this.observer = observer;
+ this.logger = logger;
+ this.processMemoryAssignment = processMemoryAssignment;
+ }
+ public ManagedProcess( java.lang.Process process, boolean agentProcess, String correlationId ) {
+ this.process = process;
+ this.agentProcess = agentProcess;
+ }
+ public ManagedProcess( java.lang.Process process, boolean agentProcess ) {
+ this(process, agentProcess, null );
+ }
+ public boolean doKill() {
+ return forceKill;
+ }
+ public void kill() {
+ forceKill = true;
+ }
+ public void setStopping() {
+ isstopping = true;
+ }
+ public boolean isStopping() {
+ return isstopping;
+ }
+ public void setWorkDuccId( DuccId workDuccId ) {
+ this.workDuccId = workDuccId;
+ }
+ public DuccId getWorkDuccId() {
+ return this.workDuccId;
+ }
+ public DuccId getDuccId() {
+ return duccProcess.getDuccId();
+ }
+ public void setParent( String parent) {
+ this.parent = parent;
+ }
+ public String getParent() {
+ return parent;
+ }
+
+ public void setClientId(String clientId){
+ this.clientId = clientId;
+ }
+ public String getClientId() {
+ return clientId;
+ }
+ public boolean isAttached() {
+ return attached;
+ }
+
+ public void setAttached() {
+ this.attached = true;
+ }
+
+ public String getNodeName() {
+ return nodeName;
+ }
+
+ public void setNodeName(String nodeName) {
+ this.nodeName = nodeName;
+ }
+
+ public String getNodeIp() {
+ return nodeIp;
+ }
+
+ public void setNodeIp(String nodeIp) {
+ this.nodeIp = nodeIp;
+ }
+
+ /**
+ * Return current state of this object.
+ *
+ * @return
+ */
+ public Process getInstance() {
+ return this;
+ }
+ public ProcessType getProcessType() {
+ return processType;
+ }
+ public void setProcessType(ProcessType processType) {
+ this.processType = processType;
+ }
+ public void setLogStream(OutputStream os ) {
+ processLogStream = os;
+ }
+ public boolean isAgentProcess() {
+ return agentProcess;
+ }
+ /**
+ * @param uimaLogPath the uimaLogPath to set
+ */
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
+ /**
+ * @return the stdErrLogPath
+ */
+ public String getLogPath() {
+ return logPath;
+ }
+ public void setAgentLogPath(String logPath) {
+ this.agentLogPath = logPath;
+ }
+
+ /**
+ * @return the stdErrLogPath
+ */
+ public String getAgentLogPath() {
+ return agentLogPath;
+ }
+ /**
+ * @return the owner
+ */
+ public String getOwner() {
+ if ( processInfo != null ) {
+ return processInfo.getUser();
+ }
+ return null;
+ }
+
+
+ public void setOSProcess(java.lang.Process process ) {
+ this.process = process;
+ }
+ private void log( String method, String message ) {
+ if ( logger != null ) {
+ logger.info(method, null, message);
+ }
+ }
+ public void drainProcessStreams(java.lang.Process process , DuccLogger logger, PrintStream pStream, boolean isKillCmd ) { // InputStream outputStream, InputStream errorStream) {
+
+ // Create dedicated Thread Group for the process stream consumer threads
+ ThreadGroup group = new ThreadGroup("AgentDeployer" + tgCounter++);
+ // Fetch stdin from the deployed process
+ InputStream stdin = process.getInputStream();
+ // Fetch stderr from the deployed process
+ InputStream stderr = process.getErrorStream();
+ // Create dedicated thread to consume std output stream from the process
+ stdOutReader = new ProcessStreamConsumer(logger, group, "StdOutputReader", stdin, pStream);
+ // Create dedicated thread to consume std error stream from the process
+ stdErrReader = new ProcessStreamConsumer(logger, group, "StdErrorReader", stderr, pStream);
+
+ // Start both stream consumer threads
+ stdOutReader.start();
+ stdErrReader.start();
+ // block until the process is terminated or the agent terminates
+ boolean finished = false;
+ while (!finished) {
+ try {
+ process.waitFor();
+ } catch (InterruptedException e) {
+ }
+ try {
+ process.exitValue();
+ finished = true;
+ } catch (IllegalThreadStateException e) {
+ }
+ }
+ try {
+ // wait for stdout and stderr reader threads to finish. Join for max of 2 secs
+ // The process has exited and in theory the join should return quickly.
+ // We do the join to make sure that the streams are drained so that we
+ // can get a reason for failure if there was a problem launching the process.
+ stdOutReader.join(2000);
+ stdErrReader.join(2000);
+ } catch( InterruptedException ie) {
+ log("ManagedProcess.drainProcessStreams","Interrupted While Awaiting Termination of StdOutReader and StdErrReader Threads");
+ }
+
+ String reason =
+ getDuccProcess().getReasonForStoppingProcess();
+
+ ProcessState pstate = getDuccProcess().getProcessState();
+
+ if ( isKillCmd ||
+ // if the process is to be killed due to init problems, set the state to Stopped
+ ( reason != null &&
+ (
+ reason.equals(ReasonForStoppingProcess.FailedInitialization.toString() ) ||
+ reason.equals(ReasonForStoppingProcess.InitializationTimeout.toString() )
+ )
+ )
+ ) {
+ getDuccProcess().setProcessState(ProcessState.Stopped);
+
+ } else {
+ // Process has terminated. Determine why the process terminated.
+ log("ManagedProcess.drainProcessStreams", "Ducc Process with PID:"+getPid() +" Terminated while in "+pstate+" State");
+
+ // true if agent killed the process. Process either exceeded memory use or
+ // the PM state notifications stopped coming in.
+ if ( doKill() ) {
+ // Agent killed the process due to timeout waiting for OR state
+ pstate = ProcessState.Killed;
+ } else {
+ // check if the process died due to an external cause. If there was the
+ // case isstopping = false. The isstopping=true iff the Agent initiated
+ // process stop because the process was deallocated
+ if ( !isstopping) {
+ // unexpected process death. Killed by the OS or admin user with kill -9
+ pstate = ProcessState.Failed;
+ // fetch errors from stderr stream. If the process failed to start due to misconfiguration
+ // the reason for failure would be provided by the OS (wrong user id, bad directory,etc)
+ String errors = stdErrReader.getDataFromStream();
+ if ( errors.trim().length() > 0 ) {
+ getDuccProcess().setReasonForStoppingProcess(errors.trim());
+ } else {
+ getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.Croaked.toString());
+ }
+ } else {
+ // if the process was stopped due InitializationFailure or InitializationTimeout
+ // the following method will be a noop.
+ addReasonForStopping(getDuccProcess(),ReasonForStoppingProcess.Deallocated.toString());
+ pstate = ProcessState.Stopped;
+ }
+ notifyProcessObserver(pstate);
+ log("ManagedProcess.drainProcessStreams","************ Remote Process PID:"+getPid()+" Terminated ***************");
+ }
+ }
+ }
+ private void addReasonForStopping(IDuccProcess process, String reason) {
+ if ( getDuccProcess().getReasonForStoppingProcess() == null || getDuccProcess().getReasonForStoppingProcess().trim().length() == 0 ) {
+ getDuccProcess().setReasonForStoppingProcess(reason);
+ }
+ }
+ /**
+ * @return the state
+ */
+ public ServiceState getStatus() {
+ return state;
+ }
+
+ /**
+ * @return the pid
+ */
+ public String getPid() {
+ return duccProcess.getPID();
+ }
+ /**
+ * @param pid the pid to set
+ */
+ public void setPid(String pid) {
+ this.pid = pid;
+ duccProcess.setPID(pid);
+ pidReadyCount.countDown();
+ }
+ public void awaitPid() {
+ try {
+ pidReadyCount.await();
+ } catch( Exception e) {
+ }
+ }
+ public void releasePidLatch() {
+ pidReadyCount.countDown();
+ }
+ /**
+ * @return the command
+ */
+ public List<String> getCommand() {
+ return command;
+ }
+ /**
+ * @param command the command to set
+ */
+ /**
+ * @param command the command to set
+ */
+ public void setCommand(List<String> commandToRun) {
+ this.command = commandToRun;
+ }
+
+ /**
+ * @return the processCorrelationId
+ */
+ public String getProcessId() {
+ return processCorrelationId;
+ }
+ /**
+ * @param processCorrelationId the processCorrelationId to set
+ */
+ public void setProcessId(String processId) {
+ this.processCorrelationId = processId;
+ }
+ /**
+ * @return the jmxPort
+ */
+ public String getPort() {
+ return port;
+ }
+ /**
+ * @param jmxPort the jmxPort to set
+ */
+ public void setPort(String port) {
+ this.port = port;
+ }
+ public void terminateRemoteProcess() throws Exception {
+ System.out.println("Stopping Remote Managed Process via JMX");
+ // Using JMX instruct the service to terminate
+// if ( processMBean != null ) {
+// processMBean.getManagedServiceMBean().terminate();
+// }
+ }
+ public void failed() {
+// setStatus(ServiceState.FAILED);
+ cleanup();
+// managedProcessInfo.setState(state);
+// if ( getEndDate() == null ) {
+// setEndDate(new DateTime());
+// }
+ }
+ private void cleanup() {
+ if ( process != null ) {
+ process.destroy();
+ }
+ destroyed = true;
+
+ }
+ public void stop() {
+// if ( getStatus() != ServiceState.FAILED ) {
+// setStatus(ServiceState.STOPPED);
+// }
+ cleanup();
+// if ( getEndDate() == null ) {
+// setEndDate(new DateTime());
+// }
+// if ( process != null ) {
+// process.destroy();
+// }
+// destroyed = true;
+ }
+ public void waitFor() throws InterruptedException {
+ process.waitFor();
+ }
+
+ public boolean isDestroyed() {
+ return destroyed;
+ }
+ public String getAbsoluteLogPath() {
+ return absoluteLogPath;
+ }
+ public void setAbsoluteLogPath(String absoluteLogPath) {
+ this.absoluteLogPath = absoluteLogPath;
+ }
+ public String getDescription() {
+ return description;
+ }
+ public void setDescription(String description) {
+ this.description = description;
+ }
+ public Throwable getExceptionStackTrace() {
+ return exceptionStackTrace;
+ }
+ public void setExceptionStackTrace(Throwable exceptionStackTrace) {
+ this.exceptionStackTrace = exceptionStackTrace;
+ }
+ /**
+ * @return the processSpecification
+ */
+ public ICommandLine getCommandLine() {
+ return commandLine;
+ }
+ /**
+ * @return the duccProcess
+ */
+ public IDuccProcess getDuccProcess() {
+ return duccProcess;
+ }
+ public void notifyProcessObserver(ProcessState state) {
+ if ( observer != null && getDuccProcess() != null ) {
+ if ( ProcessState.InitializationTimeout.equals(state)) {
+ observer.onJPInitTimeout(getDuccProcess());
+ } else {
+ getDuccProcess().setProcessState(state);
+ observer.onProcessExit(getDuccProcess());
+ }
+ }
+ }
+ public void startInitializationTimer() {
+ initTimer = new Timer();
+ long timeout = 7200*1000; // default timeout after 2 hours of initialization
+ try {
+ String str_timeout;
+ if ( (str_timeout = System.getProperty("ducc.agent.launcher.process.init.timeout")) != null ) {
+ timeout = Long.parseLong(str_timeout);
+ }
+ } catch( NumberFormatException e) {
+
+ }
+ initTimer.schedule( new InitializationTask(), timeout); // timeout after 2 hours of initialization
+ }
+
+ /**
+ * Stops service initialization timer and starts a new timer that will cleanup
+ * UIMA pipeline initialization stats. These states are only needed during the
+ * initialization of a service.
+ */
+ public void stopInitializationTimer() {
+ if ( initTimer != null ) {
+ initTimer.cancel();
+ long timeout = 60000; // default timeout after 60 seconds
+ if ( getDuccProcess().getUimaPipelineComponents() != null && getDuccProcess().getUimaPipelineComponents().size() > 0) {
+ cleanupTimer = new Timer();
+ try {
+ String inv_publish_rate;
+ if ( (inv_publish_rate = System.getProperty("ducc.agent.node.inventory.publish.rate")) != null ) {
+ timeout = Long.parseLong(inv_publish_rate)*4; // allow sufficient time to publish init stats
+ }
+ } catch( NumberFormatException e) {
+ }
+ cleanupTimer.schedule( new CleanupTask(), timeout); // timeout and remove UIMA pipeline stats
+ }
+ }
+ }
+ private class InitializationTask extends TimerTask {
+ public void run() {
+ initTimer.cancel();
+ notifyProcessObserver(ProcessState.InitializationTimeout);
+ }
+ }
+ private class CleanupTask extends TimerTask {
+ public void run() {
+ cleanupTimer.cancel();
+ ((DuccProcess)getDuccProcess()).setUimaPipelineComponents(null);
+ }
+ }
+
+ public IDuccStandardInfo getProcessInfo() {
+ return processInfo;
+ }
+
+ public void setProcessInfo(IDuccStandardInfo processInfo) {
+ this.processInfo = processInfo;
+ }
+
+ public boolean killAfterLaunch() {
+ return killAfterLaunch;
+ }
+
+ public void killAfterLaunch(boolean killAfterLaunch) {
+ this.killAfterLaunch = killAfterLaunch;
+ }
+
+ public Future<?> getFuture() {
+ return future;
+ }
+
+ public void setFuture(Future<?> future) {
+ this.future = future;
+ }
+
+ public long getProcessMemoryAssignment() {
+ return processMemoryAssignment;
+ }
+
+ public String getSocketEndpoint() {
+ return socketEndpoint;
+ }
+
+ public void setSocketEndpoint(String socketEndpoint) {
+ this.socketEndpoint = socketEndpoint;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.Serializable;
+
+public class ManagedServiceInfo implements Serializable {
+ public enum ServiceState { STARTING, INITIALIZING, READY, FAILED, STOPPING, STOPPED, KILLED };
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private String pid;
+ private ServiceState state;
+ /**
+ * @return the state
+ */
+ public ServiceState getState() {
+ return state;
+ }
+ /**
+ * @param state the state to set
+ */
+ public void setState(ServiceState state) {
+ this.state = state;
+ }
+ /**
+ * @return the pid
+ */
+ public String getPid() {
+ return pid;
+ }
+ /**
+ * @param pid the pid to set
+ */
+ public void setPid(String pid) {
+ this.pid = pid;
+ }
+ public String toString() {
+ return "PID:"+getPid()+" State:"+getState();
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ManagedServiceInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+
+public interface Process extends Serializable {
+ public String getLogPath();
+ public void setLogPath(String logPath);
+ public String getPid();
+ public void setPid(String pid);
+ public List<String> getCommand();
+ public String getProcessId();
+ public void setProcessId(String processId);
+ public String getPort();
+ public void setPort(String port);
+ public String getNodeIp();
+ public void setNodeIp( String nodeIp);
+ public boolean isAgentProcess();
+// public void setOSProcess(java.lang.Process target );
+ public void setLogStream(OutputStream ps);
+ public String getNodeName();
+ public void setNodeName(String nodeName);
+ public void setAgentLogPath(String logPath);
+ public String getAgentLogPath();
+ public boolean isAttached();
+ public void setAttached();
+ public void setClientId(String clientId);
+ public String getClientId();
+ public String getAbsoluteLogPath();
+ public void setAbsoluteLogPath(String absoluteLogPath);
+ public String getDescription();
+ public void setDescription(String description);
+
+ public void setParent( String parent);
+ public String getParent();
+ public Throwable getExceptionStackTrace();
+ public void setExceptionStackTrace(Throwable exceptionStackTrace);
+
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/Process.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.launcher;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+
+public class ProcessStreamConsumer extends Thread {
+ public static final String LS = System.getProperty("line.separator");
+
+ private InputStream is;
+ private PrintStream os;
+ DuccLogger logger;
+ private StringBuffer errStreamBuffer = new StringBuffer();
+ public ProcessStreamConsumer(DuccLogger logger, ThreadGroup threadGroup,
+ final String threadName, InputStream is, PrintStream os) {
+ super(threadGroup, threadName);
+ this.os = os;
+ this.is = is;
+ this.logger = logger;
+ setDaemon(true);
+ }
+ public String getDataFromStream() {
+ return errStreamBuffer.toString();
+ }
+ public void run() {
+ try {
+
+ InputStreamReader in = new InputStreamReader(is);
+ BufferedReader reader = new BufferedReader(in);
+ String line;
+ while ((line = reader.readLine()) != null) {
+ line += LS;
+ if ( "StdErrorReader".equals(Thread.currentThread().getName())) {
+ logger.error("ProcessStreamConsumer.run()", null, line.trim());
+ os.print("ERR>>>"+line);
+ // Save stderr stream into a local buffer. When the process exits unexpectedly
+ // errors will be copied to a Process object.
+ errStreamBuffer.append(line.trim());
+ } else {
+ logger.info("ProcessStreamConsumer.run()", null, line.trim());
+ os.print("OUT>>>"+line);
+ }
+ }
+ } catch (Exception x) {
+ } finally {
+ try {
+ os.flush();
+ } catch (Exception e) {
+ System.out.println("Caught Exception While Flushing "
+ + Thread.currentThread().getName() + " Output Stream");
+ }
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/launcher/ProcessStreamConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java Wed Jan 2 19:02:18 2013
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public abstract class AbstractMetricCollector implements MetricCollector {
+ private int howManyFields;
+ protected int[] metricFieldOffsets;
+ protected int[] metricFieldLengths;
+ protected byte[] metricFileContents = new byte[1024];
+ protected RandomAccessFile metricFile;
+ private int mField;
+ private int fieldOffset;
+ private int currentOffset=0;
+
+ public AbstractMetricCollector(RandomAccessFile metricFile, int howMany, int offset) {
+ this.howManyFields = howMany;
+ this.fieldOffset = offset;
+ this.metricFile = metricFile;
+ metricFieldOffsets = new int[howMany];
+ metricFieldLengths = new int[howMany];
+ }
+ public void parseMetricFile() throws IOException {
+ metricFile.seek(0);
+ metricFile.read(metricFileContents);
+ if ( fieldOffset > 0 ) {
+ // Advance the pointer just beyond the field name
+ while (metricFileContents[currentOffset] != ' ') {
+ ++currentOffset;
+ }
+ }
+ int currentFieldIndx=0;
+ while( currentFieldIndx++ < howManyFields ) {
+ readNextField();
+ }
+ }
+ private void readNextField() {
+ // Skip column padding.
+ while (metricFileContents[currentOffset] == ' ') {
+ ++currentOffset;
+ }
+ // Find the end of the value.
+ int offset = currentOffset;
+ while (metricFileContents[currentOffset] != ' ' && metricFileContents[currentOffset] != 0) {
+ ++currentOffset;
+ }
+ // Store the value's offset and length.
+ metricFieldOffsets[mField] = offset;
+ metricFieldLengths[mField++] = currentOffset - offset;
+ // Skip to the next value.
+ currentOffset += fieldOffset;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/AbstractMetricCollector.java
------------------------------------------------------------------------------
svn:eol-style = native