You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/08/27 13:31:17 UTC
svn commit: r1839320 [1/2] - in /uima/uima-ducc/trunk:
uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/
uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/
uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/ uima-ducc-...
Author: cwiklik
Date: Mon Aug 27 13:31:17 2018
New Revision: 1839320
URL: http://svn.apache.org/viewvc?rev=1839320&view=rev
Log:
UIMA-5814 add support for agent quiesce and stop
Added:
uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventQuiesceAndStop.java
uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStop.java
uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/TargetableDuccAdminEvent.java
Modified:
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStopMetrics.java
uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/component/AbstractDuccComponent.java
uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccAdmin.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/Utils.java
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/IProcessState.java
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Mon Aug 27 13:31:17 2018
@@ -31,11 +31,16 @@ import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -60,6 +65,8 @@ import org.apache.uima.ducc.agent.metric
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
+import org.apache.uima.ducc.common.admin.event.DuccAdminEventQuiesceAndStop;
+import org.apache.uima.ducc.common.admin.event.DuccAdminEventStop;
import org.apache.uima.ducc.common.admin.event.DuccAdminEventStopMetrics;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
@@ -113,7 +120,7 @@ public class NodeAgent extends AbstractD
public static String cgroupFailureReason;
// Map of known processes this agent is managing. This map is published
// at regular intervals as part of agent's inventory update.
- private Map<DuccId, IDuccProcess> inventory = new HashMap<DuccId, IDuccProcess>();
+ private HashMap<DuccId, IDuccProcess> inventory = new HashMap<DuccId, IDuccProcess>();
// Semaphore controlling access to inventory Map
private Semaphore inventorySemaphore = new Semaphore(1);
@@ -305,7 +312,7 @@ public class NodeAgent extends AbstractD
/*
* Remove ManagedProcess from map and send lifecycle event
*/
- private void processUndeploy(ManagedProcess mp) {
+ private void processUndeploy(ManagedProcess mp, Iterator<ManagedProcess> it) {
String location = "processUndeploy";
if(mp != null) {
if(!deployedProcesses.contains(mp)) {
@@ -315,7 +322,8 @@ public class NodeAgent extends AbstractD
else {
String args = "mp:"+mp.getProcessId();
logger.debug(location, jobid, args);
- deployedProcesses.remove(mp);
+ // deployedProcesses.remove(mp);
+ // it.remove();
IDuccProcess process = mp.getDuccProcess();
sendProcessLifecycleEventReport(process,LifecycleEvent.Terminate);
}
@@ -796,13 +804,23 @@ public class NodeAgent extends AbstractD
if (isAlive(agentManagedProcess)) {
// Stop the process if it has been deallocated
if (process.isDeallocated() ) {
- agentManagedProcess.setResourceState(ResourceState.Deallocated);
- logger.info(
- methodName,
- workDuccId,
- "<<<<<<<< Agent Stopping Process:" + process.getDuccId() + " PID:"
- + process.getPID() + " Reason: Ducc Deallocated the Process.");
- lifecycleController.stopProcess(agentManagedProcess);
+ // if agent is in stopping state, it will try to stop
+ // its processes.
+ if ( stopping ) {
+ logger.info(
+ methodName,
+ workDuccId,
+ ">>>>>>>> Agent is stopping - process with PID:"+process.getPID()+" is stopping");
+ return; // agent is stopping. All processes are stopping
+ } else {
+ agentManagedProcess.setResourceState(ResourceState.Deallocated);
+ logger.info(
+ methodName,
+ workDuccId,
+ "<<<<<<<< Agent Stopping Process:" + process.getDuccId() + " PID:"
+ + process.getPID() + " Reason: Ducc Deallocated the Process.");
+ lifecycleController.stopProcess(agentManagedProcess);
+ }
}
// else nothing to do. Process has been deallocated
}
@@ -831,6 +849,15 @@ public class NodeAgent extends AbstractD
+ process.getDuccId());
process.setReasonForStoppingProcess(IDuccProcess.ReasonForStoppingProcess.CommandLineMissing.name());
} else {
+ if ( stopping ) {
+ process.setProcessState(ProcessState.Rejected);
+ logger.info(
+ methodName,
+ workDuccId,
+ ">>>>>>> Agent Rejected Process:" + process.getDuccId()+
+ " Start Request - Agent is stopping");
+ return;
+ }
process.setProcessState(ProcessState.Starting);
logger.info(
methodName,
@@ -877,7 +904,10 @@ public class NodeAgent extends AbstractD
return false;
}
private boolean isOverSwapLimit(IDuccProcess process ) {
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while(it.hasNext() ) {
+ ManagedProcess deployedProcess = it.next();
+ //for (ManagedProcess deployedProcess : deployedProcesses) {
// Check if this process exceeds its alloted max swap usage
if ( deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId()) &&
process.getSwapUsage() > deployedProcess.getMaxSwapThreshold()
@@ -889,7 +919,10 @@ public class NodeAgent extends AbstractD
}
private long getSwapOverLimit(IDuccProcess process) {
long overLimit = 0;
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while(it.hasNext() ) {
+ //for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
if ( deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId()) ) {
overLimit = deployedProcess.getMaxSwapThreshold() - process.getSwapUsage();
}
@@ -936,7 +969,10 @@ public class NodeAgent extends AbstractD
public void interruptThreadInWaitFor(String pid) throws Exception {
String methodName="interruptZombieProcess";
synchronized (monitor) {
- for (ManagedProcess dProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while(it.hasNext() ) {
+ //for (ManagedProcess dProcess : deployedProcesses) {
+ ManagedProcess dProcess = it.next();
if ( dProcess.getPid() != null && dProcess.getPid().equals(pid) ) {
Future<?> future = dProcess.getFuture();
if ( future != null && !future.isDone() && !future.isCancelled()) {
@@ -1102,12 +1138,14 @@ public class NodeAgent extends AbstractD
} finally {
inventorySemaphore.release();
}
-
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while(it.hasNext() ) {
+ // for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
// Find ManagedProcess instance the DuccProcess instance is
// associated with
if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())) {
- processUndeploy(deployedProcess);
+ processUndeploy(deployedProcess, it);
break;
}
}
@@ -1175,9 +1213,12 @@ public class NodeAgent extends AbstractD
}
ManagedProcess deployedProcess = null;
synchronized (monitor) {
- for (ManagedProcess dProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while(it.hasNext() ) {
+ //for (ManagedProcess dProcess : deployedProcesses) {
// Find ManagedProcess instance the DuccProcess
// instance is associated with
+ ManagedProcess dProcess = it.next();
if (dProcess.getDuccProcess().getDuccId().getUnique()
.equals(duccEvent.getDuccProcessId())) {
deployedProcess = dProcess;
@@ -1416,7 +1457,10 @@ public class NodeAgent extends AbstractD
String methodName = "deployProcess";
synchronized (monitor) {
boolean deployProcess = true;
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while ( it.hasNext() ) {
+ //for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
// ignore duplicate start request for the same process
if (deployedProcess.getDuccId().equals(process.getDuccId())) {
deployProcess = false;
@@ -1526,22 +1570,37 @@ public class NodeAgent extends AbstractD
* timer pops and child processes are still running, the agent takes
* itself out via halt()
*/
- private boolean stopChildProcesses() {
+ private boolean stopChildProcesses(boolean quiesceMode) {
String methodName = "stopNow";
boolean wait = false;
try {
- for (ManagedProcess deployedProcess : deployedProcesses) {
- String pid = deployedProcess.getDuccProcess().getPID();
- if (pid == null || pid.trim().length() == 0 || !runnable(deployedProcess) ) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while( it.hasNext() ) {
+ ManagedProcess deployedProcess = it.next();
+ // for (ManagedProcess deployedProcess : deployedProcesses) {
+ String pid = deployedProcess.getDuccProcess().getPID();
+ logger.info(methodName, null, "....Process:"+pid+" is JD="+deployedProcess.isJd());
+ // when called with quiesce=true we dont kill JDs to allow the JP
+ // to send task completions. The JD's will be stopped when all JPs
+ // terminate.
+// if ( deployedProcess.isStopping() || ( quiesceMode && deployedProcess.isJd()) ) {
+// if ( deployedProcess.isStopping() ) {
+// continue;
+// }
+ if ( deployedProcess.isStopping() || pid == null || pid.trim().length() == 0 || !runnable(deployedProcess) ) {
continue;
}
+
logger.info(methodName, null, "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
- + " PID:" + pid+" Sending SIGTERM Process State:"+deployedProcess.getDuccProcess().getProcessState().toString());
+ + " PID:" + pid+" Sending SIGTERM Process State:"+deployedProcess.getDuccProcess().getProcessState().toString()
+ +" Process Type:"+ deployedProcess.getDuccProcess().getProcessType()
+ +" Uima AS:"+deployedProcess.isUimaAs());
wait = true;
deployedProcess.setStopPriority(StopPriority.DONT_WAIT);
// Stop each child process in its own thread to parallelize SIGTERM requests
ExecutorService executor = Executors.newSingleThreadExecutor();
- executor.execute( new ProcessRunner(deployedProcess) ); //pid,SIGNAL.SIGTERM));
+ executor.execute( new ProcessRunner(deployedProcess) );
+
}
} catch( Exception e) {
@@ -1550,19 +1609,43 @@ public class NodeAgent extends AbstractD
return wait;
}
- private void killChildProcesses() {
+ private void killChildProcesses(boolean killOnlyUimaAs ) {
String methodName = "killChildProcesses";
try {
if ( useCgroups ) {
logger.info("stop", null, "CgroupsManager.cleanup() before ");
- // Since SIGTERM may not be enough to take down a process, use cgroups to find
- // any process still standing and do hard kill
- cgroupsManager.cleanup();
+ if ( killOnlyUimaAs ) {
+ Set<String> pidsToKill = new HashSet<>();
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while( it.hasNext() ) {
+ ManagedProcess p = it.next();
+ if ( p.getPid() != null && p.isUimaAs()) {
+ pidsToKill.add(p.getPid());
+ }
+ }
+ if ( !pidsToKill.isEmpty() ) {
+ System.out.println(">>>>>>>> Found "+pidsToKill.size()+" UIMA-AS processes still running - killing all via kill -9");
+ // Since SIGTERM may not be enough to take down a process, use cgroups to find
+ // any UIMA-AS process still standing and do hard kill
+ cgroupsManager.cleanupPids(pidsToKill);
+
+ }
+
+ } else {
+ // Since SIGTERM may not be enough to take down a process, use cgroups to find
+ // any process still standing and do hard kill
+ cgroupsManager.cleanup();
+
+ }
logger.info("stop", null, "CgroupsManager.cleanup() after ");
} else {
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while( it.hasNext() ) {
+ ManagedProcess deployedProcess = it.next();
+
+ // for (ManagedProcess deployedProcess : deployedProcesses) {
String pid = deployedProcess.getDuccProcess().getPID();
if (pid == null || pid.trim().length() == 0 || !runnable(deployedProcess) ) {
continue;
@@ -1592,39 +1675,37 @@ public class NodeAgent extends AbstractD
}
- private void killIt(IDuccProcess process, ManagedProcess deployedProcess ) {
- String methodName = "killIt";
- // logger.info(methodName, null, "....Undeploying Process - DuccId:" + process.getDuccId()
- // + " PID:" + pid);
- //if (pid != null) {
- // Mark the process as stopping. When the process exits,
- // the agent can determine
- // if the process died on its own (due to say, user code
- // problem) or if it died
- // due to Agent initiated stop.
- deployedProcess.setStopping();
- // Agent will first send a stop request (via JMS) to the
- // process.
- // If the process doesnt stop within alloted window the
- // agent
- // will kill it hard
- ICommandLine cmdLine;
- try {
- if (Utils.isWindows()) {
- cmdLine = new NonJavaCommandLine("taskkill");
- cmdLine.addArgument("/PID");
- } else {
- cmdLine = new NonJavaCommandLine("/bin/kill");
- cmdLine.addArgument("-9");
- }
- cmdLine.addArgument(deployedProcess.getDuccProcess().getPID());
- launcher.launchProcess(this, getIdentity(), process, cmdLine, this, deployedProcess);
- } catch (Exception e) {
- logger.error(methodName, null, e);
- }
- //}
- }
- /**
+ private void killIt(IDuccProcess process, ManagedProcess deployedProcess) {
+ String methodName = "killIt";
+ // Mark the process as stopping. When the process exits,
+ // the agent can determine
+ // if the process died on its own (due to say, user code
+ // problem) or if it died
+ // due to Agent initiated stop.
+ deployedProcess.setStopping();
+ // Agent will first send a stop request (via JMS) to the
+ // process.
+ // If the process doesnt stop within alloted window the
+ // agent
+ // will kill it hard
+ ICommandLine cmdLine;
+ try {
+ if (Utils.isWindows()) {
+ cmdLine = new NonJavaCommandLine("taskkill");
+ cmdLine.addArgument("/PID");
+ } else {
+ cmdLine = new NonJavaCommandLine("/bin/kill");
+ cmdLine.addArgument("-9");
+ }
+ cmdLine.addArgument(deployedProcess.getDuccProcess().getPID());
+ launcher.launchProcess(this, getIdentity(), process, cmdLine, this, deployedProcess);
+ } catch (Exception e) {
+ logger.error(methodName, null, e);
+
+ }
+ }
+
+ /**
* Kills a given process
*
* @param process
@@ -1634,76 +1715,46 @@ public class NodeAgent extends AbstractD
String methodName = "undeployProcess";
synchronized (monitor) {
boolean processFound = false;
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while (it.hasNext() ) {
+ // for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
if (deployedProcess.getDuccId().equals(process.getDuccId())) {
String pid = deployedProcess.getDuccProcess().getPID();
processFound = true;
if (deployedProcess.isStopping()) {
if ( isProcessRunning(deployedProcess.getDuccProcess())) {
logger.debug(methodName, null, "....Checking if Proces with PID:" + process.getPID()+" is Defunct");
-
+ // wait for a short while before running DefunctProcessDetector
+ // Sometimes when a process is killed via kill -9 it shows as
+ // defunct in ps output. Not sure why this is so.
+ try {
+ monitor.wait(500);
+ } catch(InterruptedException ee) {
+ }
+
// spin a thread where we check if the process is defunct. If true,
// the process state is changed to Stopped and reason set to 'defunct'.
// Next inventory publication will include this new state and the OR
// can terminate a job.
defunctDetectorExecutor.execute(new DefunctProcessDetector(deployedProcess, logger));
} else if ( pid != null && deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopping ) ) {
- logger.info(methodName, null, "....Undeploying Process - DuccId:" + process.getDuccId()
+ logger.info(methodName, null, ">>>> Undeploying Process - DuccId:" + process.getDuccId()
+ " PID:" + pid);
- killIt(process, deployedProcess);
- }
- //logger.debug(methodName, null, "....Process Already Stopping PID:" + process.getPID()+" Returning");
-
+ killIt(process, deployedProcess);
+ }
break; // this process is already in stopping state
}
-
-
-
- //logger.info(methodName, null, "....Undeploying Process - DuccId:" + process.getDuccId()
- // + " PID:" + pid);
if (pid != null) {
logger.info(methodName, null, "....Undeploying Process - DuccId:" + process.getDuccId()
+ " PID:" + pid);
killIt(process, deployedProcess);
- /*
-// try {
-// // stop collecting process stats from /proc/<pid>/statm
-// super.getContext().stopRoute(pid);
-// logger.info(methodName, null, "Stopped Camel Route Collecting Metrics For PID:"+pid);
-// } catch( Exception e) {
-// logger.error(methodName, null, "....Unable to stop Camel route for PID:" + pid);
-// }
- // Mark the process as stopping. When the process exits,
- // the agent can determine
- // if the process died on its own (due to say, user code
- // problem) or if it died
- // due to Agent initiated stop.
- deployedProcess.setStopping();
- // Agent will first send a stop request (via JMS) to the
- // process.
- // If the process doesnt stop within alloted window the
- // agent
- // will kill it hard
- ICommandLine cmdLine;
- try {
- if (Utils.isWindows()) {
- cmdLine = new NonJavaCommandLine("taskkill");
- cmdLine.addArgument("/PID");
- } else {
- cmdLine = new NonJavaCommandLine("/bin/kill");
- cmdLine.addArgument("-9");
- }
- cmdLine.addArgument(pid);
- launcher.launchProcess(this, getIdentity(), process, cmdLine, this, deployedProcess);
- } catch (Exception e) {
- logger.error(methodName, null, e);
- } */
+
} else if (!deployedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Stopped)) { // process
- // not
- // reported
+ // not reported
// its
// PID
// yet
@@ -1761,7 +1812,7 @@ public class NodeAgent extends AbstractD
}
// remove route from context, otherwise the routes accumulate over time causing memory leak
super.getContext().removeRoute(process.getPID());
- StringBuffer sb = new StringBuffer("\n");
+ StringBuilder sb = new StringBuilder("\n");
logger.info(
methodName,
null,
@@ -1787,16 +1838,26 @@ public class NodeAgent extends AbstractD
// reference to an object we need to remove from the list
// of deployed processes
- ManagedProcess deployedProcessRef = null;
+ // ManagedProcess deployedProcessRef = null;
// Find a matching ManagedProcess for provided IDuccProcess
// so that we can remove it from the list
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while(it.hasNext() ) {
+ //for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
if (deployedProcess.getDuccProcess() != null
&& deployedProcess.getDuccProcess().equals(process)) {
- deployedProcessRef = deployedProcess;
+ //deployedProcessRef = deployedProcess;
+ logger.debug(methodName, null,
+ "----------------- Removing Stopped Process from Deployed List");
+ processUndeploy(deployedProcess, it);
+ logger.debug(methodName, null,
+ "----------------- Deployed Process List Size After Remove:"
+ + deployedProcesses.size());
break;
}
}
+ /*
if (deployedProcessRef != null) {
logger.debug(methodName, null,
"----------------- Removing Stopped Process from Deployed List");
@@ -1809,6 +1870,7 @@ public class NodeAgent extends AbstractD
"----------------- Process Exited but Not Found in List - Deployed Process List Size:"
+ deployedProcesses.size());
}
+ */
}
} catch (Exception e) {
logger.error(methodName, null, e);
@@ -1832,8 +1894,12 @@ public class NodeAgent extends AbstractD
public void shutdown(String reason) {
String methodName = "shutdown";
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while( it.hasNext() ) {
+ // for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
try {
+
undeployProcess(deployedProcess.getDuccProcess());
} catch (Exception e) {
logger.error(methodName, null, e);
@@ -1852,7 +1918,10 @@ public class NodeAgent extends AbstractD
public boolean isManagedProcess(Set<NodeUsersCollector.ProcessInfo> processList,
NodeUsersCollector.ProcessInfo cpi) {
synchronized (monitor) {
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while( it.hasNext() ) {
+ // for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
if (deployedProcess.getDuccProcess() != null) {
// Check if process has been deployed but has not yet
// reported its PID.
@@ -1907,7 +1976,10 @@ public class NodeAgent extends AbstractD
// pid is rogue yet. Eventually, the launched process reports its
// PID
boolean foundDeployedProcessWithNoPID = false;
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while( it.hasNext() ) {
+ //for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
if (deployedProcess.getDuccProcess() != null) {
// Check if process has been deployed but has not yet
// reported its PID.
@@ -1996,82 +2068,143 @@ public class NodeAgent extends AbstractD
.process(nmp);
}
}
+ @Override
+ public void quiesceAndStop() throws Exception {
+ stop(true, -1);
+ }
+ private void stop(boolean quiesce, long waitTimeInSecs) throws Exception {
+ synchronized (stopLock) {
+ logger.info("stop", null, "Agent stop() - quiesce:"+quiesce);
+ if (stopping) {
+ return;
+ }
+ stopping = true;
+ stateChange(EventType.SHUTDOWN);
+ // Dispatch SIGTERM to all child processes
+ boolean wait = stopChildProcesses(quiesce);
+ if ( quiesce ) {
+ logger.info("stop", null, "Agent stopping managed processes");
+ long waitTime = 60; //default
+ try {
+ waitTime = Long.valueOf(configurationFactory.processStopTimeout);
+ // Normalize. The configurationFactory.processStopTimeout from
+ // ducc.properties is in millis. The code below expects secs.
+ waitTime = (waitTime/1000);
+ } catch( Exception e) {
+ }
+ // Version 2.10.2 of UIMA-AS is not supporting quiesce and stop
+ // so we need to implement wait than kill -9 strategy.
+ waitForChildProcessesToTerminateAndKill(wait, waitTime, true);
+ logger.info("stop", null,">>>>>>>>>>>> stop() waitForChildProcessesToTerminateAndKill() completed");
+ // wait for JP processes to terminate. Return only when all
+ // terminate.
+ //waitForChildProcessesToTerminate(true);
+ //logger.info("stop", null,">>>>>>>>>>>> stop() waitForChildProcessesToTerminate() completed");
+ // now stop JDs
+ //stopChildProcesses(false);
+ //logger.info("stop", null,">>>>>>>>>>>> stop() stopChildProcesses() completed");
+ // wait for JD processes to terminate. Return only when all
+ // terminate.
+ waitForChildProcessesToTerminate(false);
+ logger.info("stop", null,">>>>>>>>>>>> stop() waitForChildProcessesToTerminate() 2 completed");
+ } else {
+ logger.info("stop", null, "Agent stopping managed processes with reaper delay of "+waitTimeInSecs+" secs");
+ // wait for 60 secs and sends SIGKILL to any process still standing
+ waitForChildProcessesToTerminateAndKill(wait, waitTimeInSecs, false);
+ }
+ // Send an empty process map as the final inventory
+// HashMap<DuccId, IDuccProcess> emptyMap = new HashMap<>();
+ DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory, getLastORSequence(), getIdentity());
+// DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap, getLastORSequence(), getIdentity());
+ ORDispatcher.dispatch(duccEvent);
+ logger.info("stop", null, "Agent published final inventory");
+
+ logger.info("stop", null, "Stopping Publishing Metrics and Inventory");
+
+ //configurationFactory.stopRoutes();
+ configurationFactory.stop();
+ logger.info("stop", null, "Reaper thread finished - calling super.stop()");
+ super.stop();
+ }
+
+ }
+ @Override
public void stop() throws Exception {
- synchronized(stopLock) {
- logger.info("stop", null, "Agent stop()");
- if (stopping) {
- return;
- }
- stopping = true;
-
- // Send an empty process map as the final inventory
- HashMap<DuccId, IDuccProcess> emptyMap =
- new HashMap<DuccId, IDuccProcess>();
- DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap,getLastORSequence(), getIdentity());
- ORDispatcher.dispatch(duccEvent);
- logger.info("stop", null, "Agent published final inventory");
- stateChange(EventType.SHUTDOWN);
-
- configurationFactory.stopRoutes();
-
- logger.info("stop", null, "Agent stopping managed processes");
- // Dispatch SIGTERM to all child processes
- boolean wait = stopChildProcesses();
-
- // Stop publishing inventory. Once the route is down the agent forces last publication
- // sending an empty process map.
- //configurationFactory.stopInventoryRoute();
-
- if ( wait && deployedProcesses.size() > 0 ) {
- logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size());
- // wait for awhile
- synchronized (this) {
- long waittime = 60000;
- if (configurationFactory.processStopTimeout != null ) {
- try {
- waittime = Long.parseLong(configurationFactory.processStopTimeout);
- } catch( NumberFormatException e) {
- logger.warn("stop", null, e);
- }
- }
- logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory.");
- wait(waittime);
- }
- }
+ // not quiesce, 60=default timeout before kill -9 is used
+ stop(false, 60);
+ }
+
+ private void waitForChildProcessesToTerminateAndKill(boolean wait, long waitTimeInSecs, boolean killJustUimaAs) throws Exception {
+ if (wait && !deployedProcesses.isEmpty()) {
+ logger.info("waitForChildProcessesToTerminateAndKill", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"
+ + deployedProcesses.size());
+
+ Timer timer = new Timer(true);
+ logger.info("waitForChildProcessesToTerminateAndKill", null, "Waiting", waitTimeInSecs, " secs before sending kill -9 to all child processes still running");
+ CountDownLatch completionLatch = new CountDownLatch(1);
+ // start a timer task which when triggered kills processes via kill -9
+ timer.schedule(new KillTimerTask(completionLatch, killJustUimaAs), waitTimeInSecs*1000);
+
+ // block this thread until killer task finishes its work
+ completionLatch.await();
+
+ // wait for awhile
+// synchronized (this) {
+// long waittime = 60000;
+// if (configurationFactory.processStopTimeout != null) {
+// try {
+// waittime = Long.parseLong(configurationFactory.processStopTimeout);
+// } catch (NumberFormatException e) {
+// logger.warn("stop", null, e);
+// }
+// }
+// logger.info("stop", null, "Waiting", waitTimeInSecs, " secs to send final NodeInventory.");
+// this.wait(waitTimeInSecs*1000);
+// }
+ }
+ // send kill -9 to any child process still running
+ //killChildProcesses();
+ stopLock.wait(1000);
+ logger.info("waitForChildProcessesToTerminateAndKill", null, "Done");
- // send kill -9 to any child process still running
- killChildProcesses();
-/*
- // Self destruct thread in case we loose AMQ broker and AMQ listener gets into retry
- // mode trying to recover a connection
- Thread t = new Thread( new Runnable() {
- public void run() {
- try {
- logger.info("stop", null, "Agent waiting for additional 10 seconds to allow for a clean shutdown before terminating itself via System.exit(1) ");
- Thread.sleep(10000);
- } catch(Exception e ) {
- logger.info("stop", null, e);
- } finally{
- logger.info("stop", null, "Agent calling System.exit(1) ... ");
- System.exit(1);
- }
- }
- });
- t.start();
- t.join(10000);
- super.stop();
- logger.info("stop", null, "Reaper thread finished - calling super.stop()");
- }
-*/
- stopLock.wait(2000);
- super.stop();
- logger.info("stop", null, "Reaper thread finished - calling super.stop()");
- }
}
+ private void waitForChildProcessesToTerminate(boolean quiesceMode) throws Exception {
+ logger.info("waitForChildProcessesToTerminate", null, "Agent Sent SIGTERM to Child Processes - Waiting for them to Quiesce - Number of Deployed Processes:"
+ + deployedProcesses.size());
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while( it.hasNext()) {
+ ManagedProcess p = it.next();
+ // dont wait for JDs to stop. In quiesce mode we keep them
+ // running until all JPs terminate and only than we stop them
+// if ( quiesceMode && ( p.isJd() || p.isUimaAs() ) ) {
+ // in quiesce mode, skip UIMA-AS processes since currently
+ // there is no support for quiesce there. So only wait for
+ // POPs (JD), UIMA based JPs, and Services
+ if ( quiesceMode && p.isUimaAs() ) {
+ continue;
+ }
+ // block waiting for process to terminate
+ p.getFuture().get();
+
+ }
+// for (ManagedProcess p : deployedProcesses) {
+// // dont wait for JDs to stop. In quiesce mode we keep them
+// // running until all JPs terminate and only than we stop them
+// if ( quiesceMode && p.isJd()) {
+// continue;
+// }
+// p.getFuture().get();
+// }
+
+ }
+
public Future<?> getDeployedJPFuture(IDuccId duccId) {
- for (ManagedProcess deployedProcess : deployedProcesses) {
+ Iterator<ManagedProcess> it = deployedProcesses.iterator();
+ while( it.hasNext() ) {
+ //for (ManagedProcess deployedProcess : deployedProcesses) {
+ ManagedProcess deployedProcess = it.next();
// ignore duplicate start request for the same process
if (deployedProcess.getDuccId().equals(duccId)) {
return deployedProcess.getFuture();
@@ -2325,7 +2458,90 @@ public class NodeAgent extends AbstractD
return logger;
}
+ private void handleQuiesceAndStopEvent(DuccAdminEventQuiesceAndStop event) {
+ logger.info("handleQuiesceAndStopEvent", null, "... Agent Received an Admin Request to Stop");
+ try {
+ stop(true, -1);
+ } catch (Exception e) {
+ logger.info("handleQuiesceAndStopEvent", null, e);
+ }
+
+ }
+
+ private void handleStopEvent(DuccAdminEventStop event) {
+ logger.info("handleStopEvent", null, "... Agent Received an Admin Request to Stop");
+ try {
+
+ stop(false, event.getTimeout());
+ } catch (Exception e) {
+ logger.info("handleStopEvent", null, e);
+ }
+
+ }
+
+ private void handleStopPublishingEvent(DuccAdminEventStopMetrics event) {
+ if ( isThisTargetNode( getTargetNodes(event.getTargets()) ) ) {
+ logger.info("handleStopPublishingEvent", null,
+ "... Agent Received an Admin Request to Stop Metrics Collection and Publishing");
+ // Stop Camel route responsible for driving collection and publishing of metrics
+ configurationFactory.stopMetricsRoute();
+ logger.info("handleStopPublishingEvent", null,
+ "... Agent Stopped Metrics Collection and Publishing");
+ }
+
+ }
+
+ private String[] getTargetNodes(String targets) {
+ logger.info("getTargetNodes", null, " Targets for Admin Command:"+targets);
+
+ return targets.split(",");
+ }
+
+ private boolean isThisTargetNode(String[] nodes ) {
+ for (String targetNode : nodes) {
+ if (Utils.isMachineNameMatch(targetNode.trim(), getIdentity().getCanonicalName())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ private boolean isTarget(String[]targets ) {
+ for (String target : targets) {
+ String[] targetParts = target.trim().split("@");
+ logger.info("isTarget", null, " Targets for Admin Command:"+target+" This agent canonical identity:"+getIdentity().getCanonicalName()+" short name:"+getIdentity().getShortName());
+ if ( "agent".equals(targetParts[0])) {
+ if (Utils.isMachineNameMatch(targetParts[1].trim(), getIdentity().getShortName())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ @Override
public void handleAdminEvent(DuccAdminEvent event) throws Exception {
+
+ Thread t = new Thread() {
+ public void run() {
+ if (event instanceof DuccAdminEventStop) {
+ if ( isTarget( getTargetNodes(((DuccAdminEventStop)event).getTargets()) ) ) {
+ handleStopEvent((DuccAdminEventStop)event);
+ }
+ } else if (event instanceof DuccAdminEventQuiesceAndStop) {
+ if ( isTarget( getTargetNodes(((DuccAdminEventQuiesceAndStop)event).getTargets()) ) ) {
+ logger.info("handleAdminEvent", null,"Node a target for quiesce");
+ handleQuiesceAndStopEvent((DuccAdminEventQuiesceAndStop)event);
+ }
+ } else if (event instanceof DuccAdminEventStopMetrics) {
+ handleStopPublishingEvent((DuccAdminEventStopMetrics)event);
+ } else {
+ logger.info("handleAdminEvent", null, "... Agent Received Unexpected Message of Type:"
+ + event.getClass().getName());
+ }
+ }
+ };
+ t.start();
+
+ /*
if (event instanceof DuccAdminEventStopMetrics) {
// Get target machines from the message
String[] nodes = ((DuccAdminEventStopMetrics) event).getTargetNodes().split(",");
@@ -2349,5 +2565,26 @@ public class NodeAgent extends AbstractD
+ event.getClass().getName());
}
+ */
+ }
+ private class KillTimerTask extends TimerTask {
+
+ private CountDownLatch completionLatch;
+ private boolean killJustUimaAs;
+
+ public KillTimerTask(CountDownLatch completionLatch, boolean killOnlyUimaAs) {
+ this.completionLatch = completionLatch;
+ killJustUimaAs = killOnlyUimaAs;
+ }
+ @Override
+ public void run() {
+ try {
+ // send kill -9 to any child process still running
+ killChildProcesses(killJustUimaAs);
+ } finally {
+ completionLatch.countDown();
+ }
+ }
+
}
}
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java Mon Aug 27 13:31:17 2018
@@ -22,6 +22,7 @@ import java.io.DataInputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -417,7 +418,8 @@ public class AgentConfiguration {
// spin a server thread which will handle AP state update messages
serverThread = new Thread( new Runnable() {
public void run() {
- while(true) {
+
+ while(!agent.isStopping()) {
try {
Socket client = serviceStateUpdateServer.accept();
// AP connected, get its status report. Handling of the status
@@ -432,7 +434,8 @@ public class AgentConfiguration {
} finally {
}
- }
+ }
+ logger.info("startAPServiceStateUpdateSocketServer", null, "State Update Server Stopped");
}
});
serverThread.start();
@@ -507,8 +510,15 @@ public class AgentConfiguration {
}
public void stopRoutes() throws Exception {
serviceStateUpdateServer.close();
- camelContext.stop();
- logger.info("AgentConfigureation.stopRoutes", null,"Camel Context stopped");
+ List<RouteDefinition> routes =
+ camelContext.getRouteDefinitions();
+ for( RouteDefinition rd : routes ) {
+ logger.info("AgentConfigureation.stopRoutes", null,"Stopping route:"+rd.getLabel()+" : "+rd.getId());
+ rd.stop();
+ logger.info("AgentConfigureation.stopRoutes", null,"Stopped route:"+rd.getLabel()+" : "+rd.getId());
+ }
+ // camelContext.stop();
+ // logger.info("AgentConfigureation.stopRoutes", null,"Camel Context stopped");
}
@Bean
@@ -570,7 +580,14 @@ public class AgentConfiguration {
logger.error(methodName, null, e);
}
}
-
+ public void stop() {
+ try {
+ serviceStateUpdateServer.close();
+
+ } catch( Exception e) {
+
+ }
+ }
private class DuccNodeFilter implements Predicate {
private NodeAgent agent = null;
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java Mon Aug 27 13:31:17 2018
@@ -88,27 +88,10 @@ public class AgentEventListener implemen
IDuccProcess process = jobDeployment.getJdProcess();
sb.append("\nJD--> JobId:"+jobDeployment.getJobId()+" ProcessId:"+process.getDuccId()+" PID:"+process.getPID()+" Status:"+process.getProcessState() + " Resource State:"+process.getResourceState()+" isDeallocated:"+process.isDeallocated());
}
- /*
- else {
- IDuccProcess process = jobDeployment.getJdProcess();
- String ip1 = process.getNodeIdentity().getIp();
- String ip2 = agent.getIdentity().getIp();
- sb.append("\nREJECTED: processIP="+ip1+" "+"agentIP="+ip2);
- sb.append("\nREJECTED: JD--> JobId:"+jobDeployment.getJobId()+" ProcessId:"+process.getDuccId()+" PID:"+process.getPID()+" Status:"+process.getProcessState() + " Resource State:"+process.getResourceState()+" isDeallocated:"+process.isDeallocated());
- }
- */
for( IDuccProcess process : jobDeployment.getJpProcessList() ) {
if ( isTargetNodeForProcess(process) ) {
sb.append("\n\tJob ID:"+jobDeployment.getJobId()+" ProcessId:"+process.getDuccId()+" PID:"+process.getPID()+" Status:"+process.getProcessState() + " Resource State:"+process.getResourceState()+" isDeallocated:"+process.isDeallocated());
}
- /*
- else {
- String ip1 = process.getNodeIdentity().getIp();
- String ip2 = agent.getIdentity().getIp();
- sb.append("\nREJECTED: processIP="+ip1+" "+"agentIP="+ip2);
- sb.append("\n\tREJECTED: Job ID:"+jobDeployment.getJobId()+" ProcessId:"+process.getDuccId()+" PID:"+process.getPID()+" Status:"+process.getProcessState() + " Resource State:"+process.getResourceState()+" isDeallocated:"+process.isDeallocated());
- }
- */
}
}
if ( sb.length() > 0 ) {
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java Mon Aug 27 13:31:17 2018
@@ -30,6 +30,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@@ -63,7 +64,7 @@ public class CGroupsManager {
// legacy means that the cgonfig points to <cgroup location>/ducc
private boolean legacyCgConfig = false;
-
+ private Object waitForLockObject = new Object();
enum CGroupCommand {
CGSET("cgset"),
@@ -489,6 +490,128 @@ public class CGroupsManager {
}
}
}
+ private boolean isTargetForKill(Set<String> targets, String pid) {
+ Iterator<String> it = targets.iterator();
+ while( it.hasNext() ) {
+ if (pid.equals(it.next())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ /**
+ * Finds all stale CGroups and cleans them up. The code only
+ * cleans up cgroups folders with names that follow
+ * ducc's cgroup naming convention: <id>.<id>.<id>.
+ * First, each cgroup is checked for still running processes in the
+ * cgroup by looking at /<cgroup base dir>/<id>/cgroup.proc file which
+ * includes PIDs of processes associated with the cgroups. If
+ * processes are found, each one is killed via -9 and the cgroup
+ * is removed.
+ *
+ * @throws Exception
+ */
+ public void cleanupPids(Set<String> pidsToKill) throws Exception {
+
+ Set<NodeProcessInfo> processes = getProcessesOnNode();
+ // Match any folder under /cgroup/ducc that has syntax
+ // <number>.<number>.<number>
+ // This syntax is assigned by ducc to each cgroup
+ Pattern p = Pattern.compile("((\\d+)\\.(\\d+)\\.(\\d+))");
+
+ File cgroupsFolder = new File(getCGroupLocation(CGDuccMemoryPath));
+ String[] files = cgroupsFolder.list();
+ if ( files == null || files.length == 0 ) {
+ return;
+ }
+
+ for (String cgroupFolder : files) {
+ Matcher m = p.matcher(cgroupFolder);
+ // only look at ducc's cgroups
+ if (m.find()) {
+ try {
+ // open proc file which may include PIDs if processes are
+ // still running
+ File f = new File(getCGroupLocation(CGDuccMemoryPath) + cgroupFolder+ CGProcsFile);
+ // collect all pids
+ String[] pids = readPids(f);
+
+ if ( pids != null && pids.length > 0 ) {
+ agentLogger.info("cleanupOnStartup", null,"Agent found "+pids.length+" cgroup proceses still active. Proceeding to remove running processes");
+ }
+
+ int zombieCount=0;
+ // kill each runnig process via -9
+ if (pids != null && pids.length > 0) {
+ for (String pid : pids) {
+ if ( !isTargetForKill(pidsToKill, pid)) {
+ continue;
+ }
+ // Got cgroup processes still running. Kill them
+ for (NodeProcessInfo proc : processes) {
+ // Dont kill zombie process as it is already dead. Just increment how many of them we have
+ if ( proc.isZombie() ) {
+ zombieCount++;
+ } else if (proc.getPid().equals(pid)) {
+ // kill process hard via -9
+ System.out.println(">>>>>> Killing target process "+proc.getPid());
+ kill( proc.getUserid(), proc.getPid(), NodeAgent.SIGKILL);
+ }
+ }
+ }
+ long logCount = 0;
+ // it may take some time for the cgroups to udate accounting. Just cycle until
+ // the procs file becomes empty under a given cgroup
+ while( true ) {
+ boolean found = false;
+ pids = readPids(f);
+ for ( String pid : pids ) {
+ if ( isTargetForKill(pidsToKill, pid)) {
+ found = true;
+ break; // at least one process from the target list is still running
+ }
+ }
+
+ // if the cgroup contains no pids or there are only zombie processes dont wait
+ // for cgroup accounting. These processes will never terminate. The idea
+ // is not to enter into an infinite loop due to zombies
+ if ( !found || pids == null || pids.length == 0 || (zombieCount == pids.length)) {
+ break;
+ } else {
+ try {
+ synchronized(this) {
+ // log every ~30 minutes (10000 * 200), where 200 is a wait time in ms between tries
+ if ( logCount % 10000 == 0) {
+ agentLogger.info("cleanupOnStartup", null,
+ "--- CGroup:" + cgroupFolder+ " procs file still showing processes running. Wait until CGroups updates acccounting");
+ }
+ logCount++;
+ wait(200);
+
+ }
+ } catch( InterruptedException ee) {
+ break;
+ }
+ }
+ }
+ }
+ // Don't remove CGroups if there are zombie processes there. Otherwise, attempt
+ // to remove the CGroup may hang a thread.
+ if ( zombieCount == 0 ) { // no zombies in the container
+ destroyContainer(cgroupFolder, SYSTEM, NodeAgent.SIGTERM);
+ agentLogger.info("cleanupOnStartup", null,
+ "--- Agent Removed Empty CGroup:" + cgroupFolder);
+ } else {
+ agentLogger.info("cleanupOnStartup", null,"CGroup "+cgroupFolder+" Contains Zombie Processing. Not Removing the Container");
+ }
+ } catch (FileNotFoundException e) {
+ // noop. Cgroup may have been removed already
+ } catch (Exception e) {
+ agentLogger.error("cleanupOnStartup", null, e);
+ }
+ }
+ }
+ }
public boolean isPidInCGroup(String pid) throws Exception {
String[] pids = getAllCGroupPids();
@@ -977,9 +1100,11 @@ public class CGroupsManager {
killChildProcesses(containerId, userId, NodeAgent.SIGTERM);
if ( childProcessCount > 0 ) {
agentLogger.info("destroyContainer", null, "Killed "+childProcessCount+"Child Processes with kill -15");
- try {
- this.wait(maxTimeToWaitForProcessToStop);
- } catch( InterruptedException ie) {
+ synchronized( waitForLockObject) {
+ try {
+ waitForLockObject.wait(maxTimeToWaitForProcessToStop);
+ } catch( InterruptedException ie) {
+ }
}
}
}
@@ -998,7 +1123,7 @@ public class CGroupsManager {
return true;
}
}
- return true; // nothing to do, cgroup does not exist
+ return true; // nothing to do, cgroup does not exist
} catch (Exception e) {
agentLogger.info("destroyContainer", null, e);
return false;
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java Mon Aug 27 13:31:17 2018
@@ -733,10 +733,12 @@ public class DuccCommandExecutor extends
isDucc20ServiceProcess = true;
}
//break;
- if (option.startsWith("-Dducc.deploy.JpType=uima")) {
+ if (option.trim().equals("-Dducc.deploy.JpType=uima")) {
// this is an AP with JP-based "nature". Meaning its using
// a JP process configuration and its pull based model.
uimaBasedAP = true;
+ } else if (option.trim().equals("-Dducc.deploy.JpType=uima-as")) {
+ ((ManagedProcess) super.managedProcess).setUimaAs();
}
}
@@ -750,6 +752,7 @@ public class DuccCommandExecutor extends
((JavaCommandLine)cmdLine).setClassName("org.apache.uima.ducc.user.common.main.DuccJobService");
//((JavaCommandLine)cmdLine).setClassName("org.apache.uima.ducc.ps.service.main.ServiceWrapper");
} else {
+ ((ManagedProcess) super.managedProcess).setUimaAs();
cmdLine.addOption("-Dducc.deploy.components=uima-as");
processEnv.put(IDuccUser.EnvironmentVariable.DUCC_UPDATE_PORT.value(), System.getProperty(ProcessStateUpdate.ProcessStateUpdatePort));
((JavaCommandLine)cmdLine).setClassName("org.apache.uima.ducc.common.main.DuccService");
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java Mon Aug 27 13:31:17 2018
@@ -140,6 +140,8 @@ public class ManagedProcess implements P
private long initializationTimeout;
+ private volatile boolean isUimaAs;
+
public ManagedProcess(IDuccProcess process, ICommandLine commandLine) {
this(process, commandLine, null, null, new ProcessMemoryAssignment());
}
@@ -791,4 +793,12 @@ public class ManagedProcess implements P
public boolean isJd() {
return isJD;
}
+
+ public void setUimaAs() {
+ isUimaAs = true;
+ }
+
+ public boolean isUimaAs() {
+ return isUimaAs;
+ }
}
Added: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventQuiesceAndStop.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventQuiesceAndStop.java?rev=1839320&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventQuiesceAndStop.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventQuiesceAndStop.java Mon Aug 27 13:31:17 2018
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.admin.event;
+
+public class DuccAdminEventQuiesceAndStop extends TargetableDuccAdminEvent {
+
+ private static final long serialVersionUID = 7550350057109505396L;
+
+ public DuccAdminEventQuiesceAndStop(String nodes, String user, byte[] auth_block) {
+ super(nodes, user, auth_block);
+ }
+
+
+
+}
Added: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStop.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStop.java?rev=1839320&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStop.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStop.java Mon Aug 27 13:31:17 2018
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.admin.event;
+
+public class DuccAdminEventStop extends TargetableDuccAdminEvent {
+
+ private static final long serialVersionUID = 3529112692721958292L;
+ private long timeout=60;
+
+ public DuccAdminEventStop(String nodes, long timeoutInSecs, String user, byte[] auth_block) {
+ super(nodes, user, auth_block);
+ timeout = timeoutInSecs;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+}
Modified: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStopMetrics.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStopMetrics.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStopMetrics.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/DuccAdminEventStopMetrics.java Mon Aug 27 13:31:17 2018
@@ -18,23 +18,12 @@
*/
package org.apache.uima.ducc.common.admin.event;
-public class DuccAdminEventStopMetrics extends DuccAdminEvent {
+public class DuccAdminEventStopMetrics extends TargetableDuccAdminEvent {
private static final long serialVersionUID = 6499822168988392919L;
- // comma separated list of nodes that are target for this message
- private String targetNodes = new String();
-
public DuccAdminEventStopMetrics(String nodes, String user, byte[] auth)
{
- super(user, auth);
- this.targetNodes = nodes;
- }
- /**
- * Returns comma separated list of target nodes for this message
- * @return
- */
- public String getTargetNodes() {
- return this.targetNodes;
+ super(nodes, user, auth);
}
}
Added: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/TargetableDuccAdminEvent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/TargetableDuccAdminEvent.java?rev=1839320&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/TargetableDuccAdminEvent.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/TargetableDuccAdminEvent.java Mon Aug 27 13:31:17 2018
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.admin.event;
+
+public abstract class TargetableDuccAdminEvent extends DuccAdminEvent {
+ private static final long serialVersionUID = 3941231616804023047L;
+ // comma separated list of nodes that are target for this message
+ private String targetList;
+
+
+ public TargetableDuccAdminEvent(String targetList, String user, byte[] auth) {
+ super(user, auth);
+ this.targetList = targetList;
+
+ }
+ /**
+ * Returns comma separated list of target nodes for this message
+ * @return
+ */
+ public String getTargets() {
+ return this.targetList;
+ }
+
+}
Modified: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/component/AbstractDuccComponent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/component/AbstractDuccComponent.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/component/AbstractDuccComponent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/component/AbstractDuccComponent.java Mon Aug 27 13:31:17 2018
@@ -45,8 +45,10 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.RouteDefinition;
import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
import org.apache.uima.ducc.common.admin.event.DuccAdminEventKill;
+import org.apache.uima.ducc.common.admin.event.DuccAdminEventQuiesceAndStop;
import org.apache.uima.ducc.common.crypto.Crypto;
import org.apache.uima.ducc.common.exception.DuccComponentInitializationException;
import org.apache.uima.ducc.common.exception.DuccConfigurationException;
@@ -404,14 +406,6 @@ public abstract class AbstractDuccCompon
dumpArgs(args);
dumpProps();
- /*
- if (System.getProperty("ducc.deploy.components") != null
- && !System.getProperty("ducc.deploy.components").equals("uima-as")
- && !System.getProperty("ducc.deploy.components").equals("job-process")
- && !System.getProperty("ducc.deploy.components").equals("service")
- && !System.getProperty("ducc.deploy.components").equals("jd")
- && (endpoint = System.getProperty("ducc.admin.endpoint")) != null) {
- */
if ( daemonProcess() && ( endpoint = System.getProperty("ducc.admin.endpoint")) != null ) {
logger.info("start", null, ".....Starting Admin Channel on endpoint:" + endpoint);
startAdminChannel(endpoint, this);
@@ -472,7 +466,17 @@ public abstract class AbstractDuccCompon
protected String getProcessJmxUrl() {
return processJmxUrl;
}
-
+
+ /**
+ * This method should be overriden if a component supports quiesce. By
+ * default (method not overriden), the quiesce works exactly like stop.
+ * Currently only agent overrides this method.
+ * @throws Exception
+ */
+ public void quiesceAndStop() throws Exception {
+ // default is stop
+ stop();
+ }
public void stop()
throws Exception
{
@@ -487,8 +491,10 @@ public abstract class AbstractDuccCompon
logger.info(methodName, null, "----------stop() called");
try {
- logger.info(methodName, null, "Stopping Camel Routes");
List<Route> routes = context.getRoutes();
+ if ( !routes.isEmpty()) {
+ logger.info(methodName, null, "Stopping Camel Routes");
+ }
for (Route route : routes) {
if ( !route.getId().startsWith("mina")) {
logger.info(methodName, null, "Stopping Route:"+route.getId());
@@ -496,17 +502,11 @@ public abstract class AbstractDuccCompon
route.getEndpoint().stop();
}
}
-
+ logger.info(methodName, null, "Stopping AMQ Consumers");
ActiveMQComponent amqc = (ActiveMQComponent) context.getComponent("activemq");
amqc.stop();
amqc.shutdown();
- /*
- if (!"Uima Process".equals(componentName)) {
- logger.info(methodName, null, "Stopping Camel Context");
- context.stop();
- logger.info(methodName, null, "Camel Context Stopped");
- }
- */
+ logger.info(methodName, null, "AMQ Consumers Stopped");
ObjectName name = new ObjectName(
"org.apache.uima.ducc.service.admin.jmx:type=DuccComponentMBean,name="
@@ -527,10 +527,6 @@ public abstract class AbstractDuccCompon
logger.info(methodName, null, "Component cleanup completed - terminating process");
} catch (Exception e) {
- // It's a sensitive time, let's emit twice just for luck
- System.out.println("----------------------------------------------------------------------------------------------------");
- e.printStackTrace();
- System.out.println("----------------------------------------------------------------------------------------------------");
logger.error(methodName, null, e);
}
@@ -548,17 +544,9 @@ public abstract class AbstractDuccCompon
}
}
- // System.exit(0);
}
- public void handleUncaughtException(Exception e) {
- e.printStackTrace();
- }
- public void handleUncaughtException(Error e) {
- e.printStackTrace();
- System.out.println("Unexpected Java Error - Terminating Process via Runtime halt");
- Runtime.getRuntime().halt(2);
- }
+
/**
* Start RMI registry so the JMX clients can connect to the JVM via JMX.
@@ -568,7 +556,7 @@ public abstract class AbstractDuccCompon
*/
public String startJmxAgent() throws Exception {
String location = "startJmxAgent";
- DuccId jobid = null;
+ //DuccId jobid = null;
String key = "com.sun.management.jmxremote.authenticate";
String value = System.getProperty(key);
logger.info(location, jobid, key+"="+value);
@@ -584,18 +572,9 @@ public abstract class AbstractDuccCompon
}
boolean done = false;
JMXServiceURL url = null;
- /*
- String jmxAccess = "local";
- String hostname = "localhost";
- if ( (jmxAccess = System.getProperty("ducc.jmx.access") ) != null && jmxAccess.equals("remote") ) {
- hostname = InetAddress.getLocalHost().getHostName();
- }
- RMIServerSocketFactory serverFactory = new RMIServerSocketFactoryImpl(InetAddress.getByName(hostname));
- */
// retry until a valid rmi port is found
while (!done) {
try {
- // LocateRegistry.createRegistry(rmiRegistryPort, null, serverFactory);
LocateRegistry.createRegistry(rmiRegistryPort);
done = true;
// Got a valid port
@@ -613,15 +592,6 @@ public abstract class AbstractDuccCompon
String s = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname,
rmiRegistryPort);
url = new JMXServiceURL(s);
- /*
- Map<String,Object> env = new HashMap<>();
- env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, serverFactory);
-
- RMIConnectorServer rmiServer = new RMIConnectorServer( new JMXServiceURL(url.toString()),
- env, ManagementFactory.getPlatformMBeanServer() );
-
- rmiServer.start();
- */
jmxConnector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);
jmxConnector.start();
} catch (Exception e) {
@@ -637,10 +607,21 @@ public abstract class AbstractDuccCompon
public void cleanup(Throwable e) {
e.printStackTrace();
+ logger.error("handleUncaughtException", null, e);
+ }
+ public void handleUncaughtException(Exception e) {
+ e.printStackTrace();
+ logger.error("handleUncaughtException", null, e);
+ }
+ public void handleUncaughtException(Error e) {
+ e.printStackTrace();
+ logger.error("handleUncaughtException", null, e);
+ System.out.println("Unexpected Java Error - Terminating Process via Runtime halt");
+ Runtime.getRuntime().halt(2);
}
-
public void uncaughtException(final Thread t, final Throwable e) {
e.printStackTrace();
+ logger.error("uncaughtException", null, e);
System.exit(1);
}
@@ -654,7 +635,33 @@ public abstract class AbstractDuccCompon
public void process(final Exchange exchange) throws Exception {
logger.info("AdminEventProcessor.process()", null, "Received Admin Message of Type:"
+ exchange.getIn().getBody().getClass().getName());
-
+ if ( !"agent".equals(System.getProperty("ducc.deploy.components"))) {
+ if (exchange.getIn().getBody() instanceof DuccAdminEventKill) {
+ // start a new thread to process the admin kill event. Need to do this
+ // so that Camel thread associated with admin channel can go back to
+ // its pool. Otherwise, we will not be able to stop the admin channel.
+ Thread th = new Thread(new Runnable() {
+ public void run() {
+ try {
+ delegate.onDuccAdminKillEvent((DuccAdminEventKill) exchange.getIn().getBody());
+ } catch (Exception e) {
+
+ }
+ }
+ });
+ th.start();
+ } else {
+ handleAdminEvent((DuccAdminEvent) exchange.getIn().getBody());
+ }
+ } else {
+ // agent
+// String targets = "agent@bluejws65,agent@bluejbb";
+// DuccAdminEvent e =(DuccAdminEvent) exchange.getIn().getBody();
+// DuccAdminEventQuiesceAndStop event =
+// new DuccAdminEventQuiesceAndStop(targets,e.getUser(), e.getAuthBlock());
+ handleAdminEvent((DuccAdminEvent) exchange.getIn().getBody());
+ }
+ /*
if (exchange.getIn().getBody() instanceof DuccAdminEventKill) {
// start a new thread to process the admin kill event. Need to do this
// so that Camel thread associated with admin channel can go back to
@@ -672,6 +679,7 @@ public abstract class AbstractDuccCompon
} else {
handleAdminEvent((DuccAdminEvent) exchange.getIn().getBody());
}
+ */
}
}
Modified: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccAdmin.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccAdmin.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccAdmin.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccAdmin.java Mon Aug 27 13:31:17 2018
@@ -46,6 +46,8 @@ import org.apache.commons.cli.PosixParse
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
import org.apache.uima.ducc.common.admin.event.DuccAdminEventKill;
+import org.apache.uima.ducc.common.admin.event.DuccAdminEventQuiesceAndStop;
+import org.apache.uima.ducc.common.admin.event.DuccAdminEventStop;
import org.apache.uima.ducc.common.admin.event.DuccAdminEventStopMetrics;
import org.apache.uima.ducc.common.authentication.BrokerCredentials;
import org.apache.uima.ducc.common.authentication.BrokerCredentials.Credentials;
@@ -74,7 +76,7 @@ public class DuccAdmin extends AbstractD
.getProperty("file.separator");
public static enum DuccCommands {
- killAll, startAgents, quiesceAgents
+ killAll, startAgents, quiesceAgents, stop, quiesce
};
private String brokerUrl;
@@ -177,25 +179,28 @@ public class DuccAdmin extends AbstractD
public Options getPosixOptions() {
final Options posixOptions = new Options();
- posixOptions.addOption(DuccCommands.killAll.name(), false,
- "Kill All Ducc Processes");
+ posixOptions.addOption(DuccCommands.killAll.name(), false, "Kill All Ducc Processes");
@SuppressWarnings("static-access")
- Option startAgentsOption = OptionBuilder
- .hasArgs(2)
- .withDescription(
- "starting agents defined in arg1 using command defined in arg2")
- .create("startAgents");
+ Option startAgentsOption = OptionBuilder.hasArgs(2)
+ .withDescription("starting agents defined in arg1 using command defined in arg2").create("startAgents");
posixOptions.addOption(startAgentsOption);
@SuppressWarnings("static-access")
- Option quiesceAgentsOption = OptionBuilder
- .hasArgs(1)
- .withDescription(
- "quiescing agents defined in arg1")
- .create("quiesceAgents");
- posixOptions.addOption(quiesceAgentsOption);
-
+ Option quiesceAgentsOption = OptionBuilder.hasArgs(1).withDescription("quiescing agents defined in arg1")
+ .create("quiesceAgents");
+ posixOptions.addOption(quiesceAgentsOption);
+
+ @SuppressWarnings("static-access")
+ Option quiesceOption = OptionBuilder.hasArgs(1).withDescription("quiescing targets defined in arg1")
+ .create("quiesce");
+ posixOptions.addOption(quiesceOption);
+
+ @SuppressWarnings("static-access")
+ Option stopOption = OptionBuilder.hasArgs(2).withDescription("stopping targets defined in arg2")
+ .create("stop");
+ posixOptions.addOption(stopOption);
+
return posixOptions;
}
@@ -230,14 +235,31 @@ public class DuccAdmin extends AbstractD
System.out.println("DuccAdmin sent Kill to all Ducc processes ...");
}
+
private void quiesceAgents(String nodes) throws Exception {
- String user = System.getProperty("user.name");
- Crypto crypto = new Crypto(user, true);
- byte[] cypheredMessage = crypto.getSignature();
+ String user = System.getProperty("user.name");
+ Crypto crypto = new Crypto(user, true);
+ byte[] cypheredMessage = crypto.getSignature();
+
+ dispatch(serializeAdminEvent(new DuccAdminEventStopMetrics(nodes, user, cypheredMessage)));
+ }
+
+ public void quiesceAndStop(String nodes) throws Exception {
+ String user = System.getProperty("user.name");
+ Crypto crypto = new Crypto(user, true);
+ byte[] cypheredMessage = crypto.getSignature();
+
+ dispatch(serializeAdminEvent(new DuccAdminEventQuiesceAndStop(nodes, user, cypheredMessage)));
+ }
- dispatch(serializeAdminEvent(new DuccAdminEventStopMetrics(nodes, user, cypheredMessage)));
- System.out.println("DuccAdmin sent Quiesce request to Ducc Agents ...");
+ public void stop(String nodes, long waitTimeInSecs) throws Exception {
+ String user = System.getProperty("user.name");
+ Crypto crypto = new Crypto(user, true);
+ byte[] cypheredMessage = crypto.getSignature();
+ //System.out.println(">>>>>>>>>>> waitTime:"+waitTimeInSecs+" Targets:"+nodes);
+ dispatch(serializeAdminEvent(new DuccAdminEventStop(nodes, waitTimeInSecs, user, cypheredMessage)));
}
+
/**
* Return contents of the provided command file.
*
@@ -324,15 +346,26 @@ public class DuccAdmin extends AbstractD
killAll();
} else if (commandLine.hasOption(DuccCommands.startAgents.name())) {
System.out.println("---------- Starting Agents");
- String[] args = commandLine
- .getOptionValues(DuccCommands.startAgents.name());
+ String[] args = commandLine.getOptionValues(DuccCommands.startAgents.name());
startAgents(args[0], args[1]);
} else if (commandLine.hasOption(DuccCommands.quiesceAgents.name())) {
- System.out.println("---------- Quiescing Agents");
- String[] args = commandLine
- .getOptionValues(DuccCommands.quiesceAgents.name());
- quiesceAgents(args[0]);
- }
+ System.out.println("---------- Quiescing Agents");
+ String[] args = commandLine.getOptionValues(DuccCommands.quiesceAgents.name());
+ quiesceAgents(args[0]);
+ } else if (commandLine.hasOption(DuccCommands.quiesce.name())) {
+ System.out.println("---------- Quiescing Targets");
+ String[] args = commandLine.getOptionValues(DuccCommands.quiesce.name());
+ quiesceAndStop(args[0]);
+ } else if (commandLine.hasOption(DuccCommands.stop.name())) {
+ String[] args = commandLine.getOptionValues(DuccCommands.stop.name());
+ if ( args.length == 2) {
+ System.out.println("---------- Stopping Targets");
+ stop(args[1], Long.parseLong(args[0]) );
+ } else if ( args.length == 1) {
+ System.out.println("---------- Stopping Targets");
+ stop(args[0], 0);
+ }
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java Mon Aug 27 13:31:17 2018
@@ -49,6 +49,7 @@ import org.apache.uima.ducc.ps.service.t
import org.apache.uima.ducc.ps.service.transport.ITargetURI;
import org.apache.uima.ducc.ps.service.transport.http.HttpServiceTransport;
import org.apache.uima.ducc.ps.service.transport.target.TargetURIFactory;
+import org.apache.uima.ducc.ps.service.utils.Utils;
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
@@ -236,7 +237,7 @@ public class PullService implements ISer
// tasks.
protocolHandler.start();
// wait until all process threads terminate
- threadPool.awaitTermination(0, TimeUnit.MILLISECONDS);
+ //threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
waitForProcessThreads();
} catch(InterruptedException e) {
@@ -256,23 +257,32 @@ public class PullService implements ISer
public void stop() {
// process threads should stop first to avoid trying to pull new
// work while threads are running
- stopProcessThreadPool();
+ //stopProcessThreadPool();
+ logger.log(Level.INFO, "Stopping Process Thread Pool");
+ threadPool.shutdownNow();
// close connection to remote client and cleanup
stopTransport();
+ System.out.println(">>>>>>>> "+Utils.getTimestamp()+" "+Utils.getShortClassname(this.getClass())+" .quiesceAndStop()-transport stopped");
stopProtocolHandler(false);
+ System.out.println(">>>>>>>> "+Utils.getTimestamp()+" "+Utils.getShortClassname(this.getClass())+" .quiesceAndStop()-protocol handler stopped");
stopServiceProcessor();
+ System.out.println(">>>>>>>> "+Utils.getTimestamp()+" "+Utils.getShortClassname(this.getClass())+" .stop()-processor stopped");
// monitor should be stopped last to keep posting updates to observer
stopMonitor();
+ System.out.println(">>>>>>>> "+Utils.getTimestamp()+" "+Utils.getShortClassname(this.getClass())+" .dtop()-monitor stopped");
}
public void quiesceAndStop() {
// when quiescing, let the process threads finish processing
- stopProtocolHandler(true);
-
+ stopProtocolHandler(true); // true = quiesce
+ System.out.println(">>>>>>>> "+Utils.getTimestamp()+" "+Utils.getShortClassname(this.getClass())+" .quiesceAndStop()-protocol handler stopped");
// close connection to remote client and cleanup
stopTransport();
+ System.out.println(">>>>>>>> "+Utils.getTimestamp()+" "+Utils.getShortClassname(this.getClass())+" .quiesceAndStop()-transport stopped");
stopServiceProcessor();
+ System.out.println(">>>>>>>> "+Utils.getTimestamp()+" "+Utils.getShortClassname(this.getClass())+" .quiesceAndStop()-processor stopped");
// monitor should be stopped last to keep posting updates to observer
stopMonitor();
+ System.out.println(">>>>>>>> "+Utils.getTimestamp()+" "+Utils.getShortClassname(this.getClass())+" .quiesceAndStop()-monitor stopped");
}
private void waitForProcessThreads() throws InterruptedException, ExecutionException {
for (Future<String> future : threadHandleList) {
@@ -313,7 +323,9 @@ public class PullService implements ISer
try {
logger.log(Level.INFO, "Stopping Process Thread Pool");
threadPool.shutdownNow();
- threadPool.awaitTermination(0, TimeUnit.MILLISECONDS);
+
+ // below probably not needed since this is done in start()
+ threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
logger.log(Level.INFO, "Process Thread Pool Stopped");
} catch (InterruptedException e) {
@@ -342,7 +354,7 @@ public class PullService implements ISer
}
}
private void stopTransport() {
- transport.stop();
+ transport.stop(false); // !quiesce
}
public static void main(String[] args) {
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java Mon Aug 27 13:31:17 2018
@@ -227,6 +227,7 @@ public class ServiceWrapper implements A
public void stop() {
try {
service.stop();
+ System.out.println(">>>>>> ServiceWrapper stopping JMX Agent");
jmxAgent.stop();
} catch( Exception e ) {
logger.log(Level.WARNING,"",e);
@@ -236,10 +237,11 @@ public class ServiceWrapper implements A
}
public void quiesceAndStop() {
try {
+
+ service.quiesceAndStop();
logger.log(Level.INFO,"Stoppng JMX Agent");
jmxAgent.stop();
- service.quiesceAndStop();
} catch( Exception e ) {
logger.log(Level.WARNING,"",e);
@@ -270,8 +272,10 @@ public class ServiceWrapper implements A
@Override
public void run() {
try {
- logger.log(Level.INFO, "Pull Service Caught SIGTERM Signal - Stopping (Quiescing) ...");
-
+ // Use System.out here since the logger may have already closed
+ // its streams. Logger's shutdown hook could have run by now.
+ System.out.println("Pull Service Caught SIGTERM Signal - Stopping (Quiescing) ...");
+ //logger.log(Level.INFO, "Pull Service Caught SIGTERM Signal - Stopping (Quiescing) ...");
serviceWrapper.quiesceAndStop();
} catch (Exception e) {
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java?rev=1839320&r1=1839319&r2=1839320&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java Mon Aug 27 13:31:17 2018
@@ -451,18 +451,20 @@ public class UimaAsServiceProcessor exte
public void stop() {
synchronized (UimaAsServiceProcessor.class) {
if (brokerRunning) {
- logger.log(Level.INFO, "Stopping UIMA_AS Client");
+ logger.log(Level.INFO, "Stopping UIMA-AS Client");
+ System.out.println("Stopping UIMA-AS Client");
try {
// Prevent UIMA-AS from exiting
System.setProperty("dontKill", "true");
uimaASClient.stop();
-
+ System.out.println("UIMA-AS Client Stopped");
Method brokerStopMethod = classToLaunch.getMethod("stop");
brokerStopMethod.invoke(brokerInstance);
Method waitMethod = classToLaunch.getMethod("waitUntilStopped");
waitMethod.invoke(brokerInstance);
brokerRunning = false;
+ System.out.println("Internal Broker Stopped");
super.stop();
} catch (Exception e) {