You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:02:19 UTC

svn commit: r1427906 [4/5] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-agent: main/ main/java/ main/java/org/ main/java/org/apache/ main/java/org/apache/uima/ main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/agent/ main/java/org/apache/u...

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableMemoryCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableMemoryCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableMemoryCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableMemoryCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.memory.NodeMemory;
+
+
+public interface CallableMemoryCollector extends Callable<NodeMemory>{
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableMemoryCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableNodeUsersCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableNodeUsersCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableNodeUsersCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableNodeUsersCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo;
+
+public interface CallableNodeUsersCollector extends Callable<TreeMap<String,NodeUsersInfo>>{
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/CallableNodeUsersCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeLoadAverageCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeLoadAverageCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeLoadAverageCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeLoadAverageCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.node.metrics.DefaultNodeLoadAverageInfo;
+import org.apache.uima.ducc.common.node.metrics.NodeLoadAverage;
+
+
+public class DefaultNodeLoadAverageCollector  
+    implements Callable<NodeLoadAverage>, Serializable {
+  
+    /**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	
+    public DefaultNodeLoadAverageCollector() {
+    }
+    public NodeLoadAverage call() throws Exception {
+        return new DefaultNodeLoadAverageInfo(new double[] {0.0,0.0,0.0});
+    }
+
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeLoadAverageCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeMemoryCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeMemoryCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeMemoryCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeMemoryCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.uima.ducc.common.agent.metrics.memory.DuccNodeMemory;
+import org.apache.uima.ducc.common.agent.metrics.memory.NodeMemory;
+
+
+public class DefaultNodeMemoryCollector implements CallableMemoryCollector{
+	private static final String TOTAL_MEMORY_SIZE="getTotalPhysicalMemorySize";
+	private static final String FREE_PHYSICAL_MEMORY_SIZE="getFreePhysicalMemorySize";
+	private static final String FREE_SWAP_MEMORY_SIZE="getFreeSwapSpaceSize";
+	private static final String TOTAL_SWAP_SPACE_SIZE="getTotalSwapSpaceSize";
+	private long fakeMemorySize = -1;
+
+	public DefaultNodeMemoryCollector() {
+		String tmp;
+		if ((tmp = System
+				.getProperty("ducc.agent.node.metrics.fake.memory.size")) != null) {
+			try {
+				fakeMemorySize = Long.parseLong(tmp);
+			} catch (NumberFormatException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+	  public NodeMemory call() throws Exception {
+		    return collect();
+	  }
+	  private void setMetric(String metricName, long metricValue, DuccNodeMemory nodeMemory) {
+		  try {
+              if ( fakeMemorySize != -1 ) {
+                  nodeMemory.setMemTotal(fakeMemorySize);
+              } else {
+                  nodeMemory.setMemTotal(Runtime.getRuntime().totalMemory());
+              }
+		  } catch( Exception e) {
+			  e.printStackTrace();
+		  }
+		  if ( metricName.equalsIgnoreCase(FREE_PHYSICAL_MEMORY_SIZE)) {
+			  nodeMemory.setMemFree(metricValue);
+		  } else if ( metricName.equalsIgnoreCase(FREE_SWAP_MEMORY_SIZE)) {
+			  nodeMemory.setSwapFree(metricValue);
+		  } else if ( metricName.equalsIgnoreCase(TOTAL_SWAP_SPACE_SIZE)) {
+			  nodeMemory.setSwapTotal(metricValue);
+		  } 
+	  }
+	  private DuccNodeMemory collect() {
+		  DuccNodeMemory nodeMemory = new DuccNodeMemory();
+		  OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
+		  for (Method method : operatingSystemMXBean.getClass().getDeclaredMethods()) {
+		    method.setAccessible(true);
+		    if (method.getName().startsWith("get") 
+		        && Modifier.isPublic(method.getModifiers())) {
+		            Long value;
+		        try {
+		            value = (Long)method.invoke(operatingSystemMXBean);
+		        } catch (Exception e) {
+		            value = 0L;
+		        } // try
+		        setMetric(method.getName(), value, nodeMemory);
+		    } // if
+		  } // for
+		  return nodeMemory;
+		}
+
+	  public static void main(String[] args) {
+		  try {
+			  DefaultNodeMemoryCollector collector = new DefaultNodeMemoryCollector();
+			  ExecutorService pool = Executors.newFixedThreadPool(1);
+		      Future<NodeMemory> nmiFuture = pool.submit(collector);
+		      NodeMemory nodeMemory = nmiFuture.get();
+		      System.out.println("Total Memory:"+nodeMemory.getMemTotal());
+		      System.out.println("Total Free Memory:"+nodeMemory.getMemFree());
+		      System.out.println("Total Swap Space Size:"+nodeMemory.getSwapTotal());
+		      System.out.println("Total Free Swap Space Size:"+nodeMemory.getSwapFree());
+		      
+		  } catch( Exception e) {
+			  e.printStackTrace();
+		  }
+	  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DefaultNodeMemoryCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DuccGarbageStatsCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DuccGarbageStatsCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DuccGarbageStatsCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DuccGarbageStatsCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+
+
+public class DuccGarbageStatsCollector {
+  MBeanServerConnection connection = null;
+  DuccLogger logger=null;
+  IDuccProcess process=null;
+	public DuccGarbageStatsCollector(DuccLogger logger,IDuccProcess process)  {
+	  this.logger = logger;
+	  this.process = process;
+	  try {
+	    connection = getServerConnection();
+	  } catch( Exception e) {
+	    logger.error("DuccGarbageStatsCollector.ctor", null, "Failed to Connect via JMX to PID:"+process.getPID()+" Reason:\n"+e);
+	  }
+	  
+	}
+  private MBeanServerConnection getServerConnection() throws Exception {
+    System.out.println("Connecting Monitor To Broker - URL:"+process.getProcessJmxUrl());
+    JMXServiceURL url = new JMXServiceURL(process.getProcessJmxUrl());
+    JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+    return jmxc.getMBeanServerConnection();
+  }
+	public ProcessGarbageCollectionStats collect() {
+    ProcessGarbageCollectionStats gcStats =
+            new ProcessGarbageCollectionStats();
+	  
+	  try {
+	    Set<ObjectInstance> mbeans= 
+	            connection.queryMBeans(new ObjectName("java.lang:type=GarbageCollector,*"),null );
+	    Long totalCollectionCount= new Long(0);
+	    Long totalCollectionTime=new Long(0);
+	    
+	    for( ObjectInstance gcObject : mbeans) {
+	      String gcCollectorName = gcObject.getObjectName().getCanonicalKeyPropertyListString();
+	      ObjectName memoryManagerMXBean = 
+	              new ObjectName("java.lang:" + gcCollectorName);
+	      totalCollectionCount =+ (Long) connection.getAttribute(memoryManagerMXBean,"CollectionCount");
+	      totalCollectionTime =+ (Long) connection.getAttribute(memoryManagerMXBean,"CollectionTime");
+	    }
+      // Returns the total number of collections that have occurred.
+      gcStats.setCollectionCount(totalCollectionCount);
+      // Returns the approximate accumulated collection elapsed time in milliseconds.
+      gcStats.setCollectionTime(totalCollectionTime);
+
+	    
+	  } catch( Exception e) {
+	    logger.error("", null, "Failed to Fetch JMX GC Stats From PID:"+process.getPID()+" Reason:\n"+e);
+	  }
+	  
+	  
+//	   List<GarbageCollectorMXBean> gcmb = ManagementFactory.getGarbageCollectorMXBeans();
+//	   for( GarbageCollectorMXBean gcBean : gcmb ) {
+//		  gcStats.setMemoryManagerName(gcBean.getName());
+//		  // Returns the total number of collections that have occurred.
+//		  gcStats.setCollectionCount(gcBean.getCollectionCount());
+//		  // Returns the approximate accumulated collection elapsed time in milliseconds.
+//		  gcStats.setCollectionTime(gcBean.getCollectionTime());
+	  //}
+	  return gcStats;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/DuccGarbageStatsCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/MetricCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/MetricCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/MetricCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/MetricCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.IOException;
+
+public interface MetricCollector {
+  public void parseMetricFile()  throws IOException;
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/MetricCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.node.metrics.NodeCpuInfo;
+
+
+
+public class NodeCpuCollector implements Callable<NodeCpuInfo> {
+
+  public NodeCpuInfo call() throws Exception {
+    OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
+    return new NodeCpuInfo(osBean.getAvailableProcessors());
+  }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUsageCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUsageCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUsageCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUsageCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.RandomAccessFile;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.cpu.DuccNodeCpuUsage;
+import org.apache.uima.ducc.common.agent.metrics.cpu.NodeCpuUsage;
+
+
+public class NodeCpuUsageCollector extends AbstractMetricCollector
+implements Callable<NodeCpuUsage>{
+	
+	public NodeCpuUsageCollector(RandomAccessFile fileHandle,
+			int howMany, int offset) {
+		super(fileHandle, howMany, offset);
+	}
+
+	public NodeCpuUsage call() throws Exception {
+		super.parseMetricFile();
+		return new DuccNodeCpuUsage(super.metricFileContents,
+				super.metricFieldOffsets, super.metricFieldLengths);
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUsageCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUtilizationCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUtilizationCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUtilizationCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUtilizationCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.RandomAccessFile;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.cpu.DuccNodeCpuUsage;
+import org.apache.uima.ducc.common.agent.metrics.cpu.NodeCpuUsage;
+
+
+public class NodeCpuUtilizationCollector extends AbstractMetricCollector
+implements Callable<NodeCpuUsage>{
+	
+	public NodeCpuUtilizationCollector(RandomAccessFile fileHandle,
+			int howMany, int offset) {
+		super(fileHandle, howMany, offset);
+	}
+	public NodeCpuUsage call() throws Exception {
+		super.parseMetricFile();
+		return new DuccNodeCpuUsage(super.metricFileContents,
+				super.metricFieldOffsets, super.metricFieldLengths);
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeCpuUtilizationCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeLoadAverageCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeLoadAverageCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeLoadAverageCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeLoadAverageCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.RandomAccessFile;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.node.metrics.NodeLoadAverage;
+import org.apache.uima.ducc.common.node.metrics.NodeLoadAverageInfo;
+
+
+public class NodeLoadAverageCollector extends AbstractMetricCollector 
+implements Callable<NodeLoadAverage>{
+  
+  public NodeLoadAverageCollector(RandomAccessFile metricFile,  int howMany, int offset) {
+    super(metricFile, howMany, offset);
+  }
+
+  public NodeLoadAverage call() throws Exception {
+    super.parseMetricFile();
+    return new NodeLoadAverageInfo(super.metricFileContents, super.metricFieldOffsets, super.metricFieldLengths);
+  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeLoadAverageCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeMemInfoCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeMemInfoCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeMemInfoCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeMemInfoCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.uima.ducc.common.agent.metrics.memory.NodeMemory;
+import org.apache.uima.ducc.common.node.metrics.NodeMemoryInfo;
+
+
+public class NodeMemInfoCollector implements CallableMemoryCollector {
+	private long fakeMemorySize = -1;
+	private String[] targetFields;
+
+	public NodeMemInfoCollector(String[] targetFields) {
+		this.targetFields = targetFields;
+		String tmp;
+		if ((tmp = System
+				.getProperty("ducc.agent.node.metrics.fake.memory.size")) != null) {
+			try {
+				fakeMemorySize = Long.parseLong(tmp);
+			} catch (NumberFormatException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
+	public NodeMemory call() throws Exception {
+
+		BufferedReader fileReader = new BufferedReader(new FileReader(
+				"/proc/meminfo"));
+		// the order of fields corresponds to the field label position
+		long memInfoValues[] = new long[targetFields.length];
+		try {
+			String line;
+			// Read each line from meminfo file
+			while ((line = fileReader.readLine()) != null) {
+				// parse line and remove spaces
+				String[] parts = line.trim().split("\\s+");
+
+				// ignore lines that contain fields we dont need. The
+				// targetFields array
+				// contains labels of fields we are interested in. For each line
+				// read
+				// from file try to find a match.
+				for (int i = 0; i < targetFields.length; i++) {
+					if (parts[0].equals(targetFields[i])) {
+						// got a field we need
+						try {
+							memInfoValues[i] = Long.parseLong(parts[1]);
+						} catch (NumberFormatException e) {
+							throw e;
+						}
+						break; // get the next field
+					}
+				}
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+		return new NodeMemoryInfo(memInfoValues, fakeMemorySize);
+	}
+	public static void main(String[] args) {
+	    String[] meminfoTargetFields = new String[] {"MemTotal:","MemFree:","SwapTotal:","SwapFree:"};
+
+		try {
+			NodeMemInfoCollector nmi = new NodeMemInfoCollector(meminfoTargetFields);
+			ExecutorService pool = Executors.newFixedThreadPool(1);
+			while( true ) {
+				Future<NodeMemory> nmiFuture = pool.submit(nmi);
+				NodeMemory memInfo = nmiFuture.get();
+				System.out.println("... Memonfo Data:"
+						+memInfo.getMemTotal()+
+						" Memory Free:"+memInfo.getMemFree()+
+						" Swap Total:"+memInfo.getSwapTotal()+
+						" Swap Free:"+memInfo.getSwapFree());
+				synchronized(nmi) {
+					nmi.wait(4000);
+				}
+			}
+			
+		} catch( Exception e) {
+			e.printStackTrace();
+		}
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeMemInfoCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeUsersCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeUsersCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeUsersCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeUsersCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.uima.ducc.agent.Agent;
+import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo;
+import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo.NodeProcess;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.common.utils.id.IDuccId;
+
+/**
+ * Spawns "ps -ef --no-heading" cmd and scrapes the output to collect user processes. 
+ * 
+ * Detects and filters out Ducc daemon processes and AMQ broker.
+ * 
+ * Detects rogue processes which are processes that are not associated with any job 
+ * and not belonging to a user node reservation. 
+ * 
+ */
+public class NodeUsersCollector implements CallableNodeUsersCollector {
+	
+	DuccLogger logger;
+	DuccId jobid = null;
+	Agent agent;
+	
+	public NodeUsersCollector(Agent agent, DuccLogger logger) {
+	  this.agent = agent;
+	  this.logger = logger;
+	}
+  /**
+   * Returns true if a given userId belongs to an exclusion list defined in ducc.properties.
+   * This list contains user IDs the Agent should exclude while determining rogue processes.
+   * System owned (root, nfs, etc) processes should not be reported as rogue. 
+   * 
+   * @param userId
+   * @return
+   */
+  public boolean excludeUser(String userId ) {
+    String userFilter =
+            System.getProperty("ducc.agent.rogue.process.user.exclusion.filter");
+    if ( userFilter != null) {
+      //  exclusion list contains comma separated user ids
+      String[] excludedUsers = userFilter.split(",");
+      for ( String excludedUser : excludedUsers ) {
+        if ( excludedUser.equals(userId)) {
+          return true;
+        }
+      }
+      
+    }
+    return false;
+  }
+  /**
+   * Returns true if a given process belongs to an exclusion list defined in ducc.properties.
+   * This list contains process names the Agent should exclude while determining rogue processes.
+   * 
+   * @param process
+   * @return
+   */
+  public boolean excludeProcess(String process ) {
+    String processFilter = 
+            System.getProperty("ducc.agent.rogue.process.exclusion.filter");
+    if ( processFilter != null ) {
+      //  exclusion list contains comma separated user ids
+      String[] excludedProcesses = processFilter.split(",");
+      for ( String excludedProcess : excludedProcesses ) {
+        if ( excludedProcess.equals(process)) {
+          return true;
+        }
+      }
+    } 
+    return false;
+  }
+
+	private void aggregate( Set<NodeUsersCollector.ProcessInfo> processList, NodeUsersCollector.ProcessInfo cpi ) {
+	  boolean added = false;
+	  
+	  for( NodeUsersCollector.ProcessInfo pi: processList ) {
+	    // PIDs below are ints so safe to use ==
+	    if ( pi.getPid() == cpi.getPPid() ) { // is the current process a child of another Process? 
+	        pi.getChildren().add(cpi); // add current process as a child
+         added = true;
+         if ( pi.isRogue() ) { // if parent is rogue, a child is rogue as well
+           cpi.setRogue(true);
+           break;
+         }
+	    } else if ( pi.getChildren().size() > 0 ) {
+
+	      for(NodeUsersCollector.ProcessInfo childpi : pi.getChildren() ) {
+	        // is the current process a child of another Process?
+	        if ( childpi.getPid() == cpi.getPPid() ) {  
+	          added = true;  // dont add the child here as it will cause ConcurrentModificationException
+	                         // just mark it for inclusion in a child list below
+	          if ( childpi.isRogue() ) { // if parent is rogue, a child is rogue as well
+	            cpi.setRogue(true);
+	          }
+            break;  // stop iterating over children
+	        }
+	      }
+	    } 
+      if ( added ) {
+        pi.getChildren().add(cpi); // add current process as a child
+        if ( logger == null ) {
+          //System.out.println("********* Adding Child Process With PID:"+cpi.getPid()+ " As Child of Process:"+cpi.getPPid());
+        } else {
+          logger.info("aggregate", null, "********* Adding Child Process With PID:"+cpi.getPid()+ " As Child of Process:"+cpi.getPPid());
+        }
+        break;
+     }
+	  }
+	  // not added to the list in the code above
+	  if ( !added ) {
+	    processList.add(cpi);
+      if ( logger == null ) {
+        //System.out.println("********* Adding Process With PID:"+cpi.getPid()+ " NO PARENT");
+      } else {
+        logger.info("aggregate", null, "********* Adding Process With PID:"+cpi.getPid()+ " NO PARENT");
+      }
+    }
+	}
+	private boolean duccDaemon(String[] tokens) {
+	  String location = "duccDaemon";
+	  for( String token : tokens ) {
+      if ( token.startsWith("-Dducc.deploy.components")) {
+        int pos = token.indexOf("=");
+        if ( pos > -1 ) {
+          String component = token.substring(pos+1);
+          // if a process is uima-as service need to check if this is a rogue process
+          if ( component.trim().startsWith("uima-as")) {
+            break;  // will check if rogue process below
+          }
+        }
+        if ( logger == null ) {
+         // System.out.println(
+               //   "********** Process with PID:"+tokens[1]+ " Is a Ducc Daemon:"+token+". Skipping....");
+        } else {
+          logger.trace(location, jobid, "********** Process with PID:"+tokens[1]+ " Is a Ducc Daemon:"+token+". Skipping....");
+        }
+        return true;
+      } else if (token.startsWith("-Dactivemq.base")) {
+        if ( logger == null ) {
+         // System.out.println(
+           //       "********** Process with PID:"+tokens[1]+ " Is an ActiveMQ Broker:"+token+". Skipping....");
+        } else {
+          logger.trace(location, jobid, "********** Process with PID:"+tokens[1]+ " Is an ActiveMQ Broker:"+token+". Skipping....");
+        }
+        return true;
+      }
+    }
+	  return false;
+	}
+	public TreeMap<String,NodeUsersInfo> call() throws Exception {
+		String location = "call";
+	  TreeMap<String,NodeUsersInfo> map = new TreeMap<String,NodeUsersInfo>();
+
+    List<String> currentPids = new ArrayList<String>();
+		try {
+		  
+      ProcessBuilder pb = new ProcessBuilder("ps","-Ao","user:12,pid,ppid,args", "--no-heading");
+		  pb.redirectErrorStream(true);
+		  Process proc = pb.start();
+		  //  spawn ps command and scrape the output
+      InputStream stream = proc.getInputStream();
+			BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+			String line;
+			String regex = "\\s+";
+			// copy all known reservations reported by the OR
+      agent.copyAllUserReservations(map);
+      if ( logger == null ) {
+       // System.out.println(
+        //        "********** User Process Map Size After copyAllUserReservations:"+map.size());
+      } else {
+        logger.info(location, jobid, "********** User Process Map Size After copyAllUserReservations:"+map.size());
+      }
+      // copy all known rogue processes detected previously
+      agent.getRogueProcessReaper().copyAllUserRogueProcesses(map); 
+      if ( logger == null ) {
+        //System.out.println(
+         //       "********** User Process Map Size After copyAllUserRougeProcesses:"+map.size());
+      } else {
+        logger.info(location, jobid, "********** User Process Map Size After copyAllUserRougeProcesses:"+map.size());
+      }
+      // Add all running processes to this list. Will use this list to determine if a process has a parent
+      // which is a rogue process.
+      Set<NodeUsersCollector.ProcessInfo> processList = 
+              new HashSet<NodeUsersCollector.ProcessInfo>();
+      
+      // read the next line from ps output
+			while ((line = reader.readLine()) != null) {
+
+        String tokens[] = line.split(regex);
+        String user = tokens[0];
+        String pid = tokens[1];
+        String ppid = tokens[2];
+        String cmd = tokens[3];
+        
+				if ( tokens.length > 0 ) {
+	        // Detect and skip all ducc daemons except uima-as service
+	        if ( duccDaemon(tokens)) {
+	          continue;
+	        }
+	        if ( logger == null ) {
+            //System.out.print(line);
+	        } else {
+	          logger.trace(location, jobid, line);
+	        }
+				  //  Check if current process is owned by a user that should be excluded
+				  //  from rogue process detection. A list of excluded users is in ducc.properties
+				  //  Dont include root, nfs, and other system owned processes. Also exclude
+				  //  processes that are defined in the process exclusion list in ducc.properties 
+          if ( excludeUser(user) || excludeProcess(cmd) || Utils.getPID().equals(pid))  {
+				    continue;   // skip this process         
+				  }
+	        if ( agent != null ) {
+            NodeUsersInfo nui = null; 
+            //  Check if user record is already in the map. May have been done above in
+            //  copyAllUserReservations().
+	          if ( map.containsKey(user)) {
+	            nui = map.get(user);
+	          } else {
+              nui = new NodeUsersInfo(user);
+              map.put(user, nui);
+	          }
+	          if ( logger == null ) {
+          //    System.out.println(
+           //           "User:"+user+" Reservations:"+nui.getReservations().size()+" Rogue Processes:"+nui.getRogueProcesses().size());
+	          } else {
+	            logger.info(location, jobid, "User:"+user+" Reservations:"+nui.getReservations().size()+" Rogue Processes:"+nui.getRogueProcesses().size());
+	          }
+	          // add a process to a list of processes currently running on the node. The list will be used
+	          // to remove stale rogue processes at the end of this method
+	          currentPids.add(tokens[1]);
+            currentPids.add(pid);
+	          if ( logger == null ) {
+	          } else {
+	            logger.trace(location, jobid,"Current Process (Before Calling aggregate() - PID:"+pid+" PPID:"+ppid+" Process List Size:"+processList.size());
+	          }
+	          NodeUsersCollector.ProcessInfo pi = 
+	                  new NodeUsersCollector.ProcessInfo(Integer.parseInt(pid),Integer.parseInt(ppid));
+	          // add the process to the list of processes. If this process has a parent, it will be added as a child. Compose
+	          // hierarchy of processes so that we can use it later to determine if any given process has a parent that is rogue
+	          aggregate(processList, pi);
+	          
+	          // fetch user reservations
+            List<IDuccId> userReservations = nui.getReservations();
+            //  if user has reservations on the node, any process found is not a rogue process
+            if ( userReservations.size() > 0 ) {
+              boolean found = false;
+              //  check if this process has previously been marked as rogue
+              for( NodeProcess rogue : nui.getRogueProcesses() ) {
+                if ( rogue.getPid().equals(pid)) {
+                  found = true;
+                  break;
+                }
+              }
+              
+              if ( !found && !agent.isManagedProcess(processList, pi)) {
+                // code keeps count of java and non-java processes separately, so pass the type of process (java or not) 
+                // to allow distinct accounting 
+                nui.addPid(pid,cmd.endsWith("java"));
+              }
+              continue;  // all we know that the user has a reservation and there is a process running. If there
+                         // are reservations, we cant determine which user process is a rogue process
+            }
+            //  detect if this is a rogue process and add it to the rogue process list. First check if the current process
+            //  has a parent and if so, check if the parent is rogue. Second, if parent is not rogue (or no parent)
+            //  check if the process is in agent's inventory. If its not, we have a rogue process.
+            if ( agent.isRogueProcess(user, processList, pi) ) {
+              if ( nui.getRogueProcesses().size() == 0 || !inRogueList(nui.getRogueProcesses(),pid) ) {
+                pi.setRogue(true);
+                agent.getRogueProcessReaper().submitRogueProcessForKill(user, pid,cmd.endsWith("java"));
+              }
+            }
+	        }
+				}
+			}
+		} 
+		catch (Exception e) {
+      if ( logger == null ) {
+        e.printStackTrace();
+      } else {
+        logger.error(location, jobid, e);
+      }
+		}
+		StringBuffer sb = new StringBuffer();
+    // if no processes found, clear rogue process list and list of processes associated with a reserve
+		if ( currentPids.isEmpty()) {
+      for( Map.Entry<String,NodeUsersInfo> entry : map.entrySet()) {
+        entry.getValue().getReserveProcesses().clear();
+        entry.getValue().getRogueProcesses().clear();
+      }
+    }
+
+		for( Map.Entry<String,NodeUsersInfo> entry : map.entrySet()) {
+		  sb.append(entry.getValue().toString()).append("\n");
+    }
+    if ( logger == null ) {
+      System.out.println(sb.toString());
+      System.out.println(
+            "***************************************************************************************");
+    } else {
+      logger.info(location, jobid, sb.toString());
+      logger.info(location, jobid, "******************************************************************************");
+    }
+    // remove any rogue processes that are not in the list of current processes just collected
+    agent.getRogueProcessReaper().removeDeadRogueProcesses(currentPids);
+    return map;
+	}
+	private boolean inRogueList(List<NodeProcess> rogueProcesses, String pid) {
+    for( NodeProcess rogue : rogueProcesses ) {
+      if ( rogue.getPid().equals(pid)) {
+        return true;
+      }
+    }
+    return false;
+	}
+	public class ProcessInfo {
+	  private int pid;
+	  private int ppid;
+	  boolean rogue;
+	  Set<NodeUsersCollector.ProcessInfo> childProcesses = 
+            new HashSet<NodeUsersCollector.ProcessInfo>();
+	  ProcessInfo(int pid, int ppid) {
+	    this.pid = pid;
+	    this.ppid = ppid;
+	  }
+
+    public int getPid() {
+      return pid;
+    }
+
+    public int getPPid() {
+      return ppid;
+    }
+
+    public boolean isRogue() {
+      return rogue;
+    }
+
+    public void setRogue(boolean rogue) {
+      this.rogue = rogue;
+    }
+
+    public Set<NodeUsersCollector.ProcessInfo> getChildren() {
+      return childProcesses;
+    }
+
+	}
+	private void dump(Set<NodeUsersCollector.ProcessInfo> processList) {
+	   for( NodeUsersCollector.ProcessInfo pi: processList ) {
+       if ( logger == null ) {
+         System.out.println("Process PID:"+pi.getPid()+" PPID:"+pi.getPPid());
+       } else {
+         logger.trace("dump",null,"Process PID:"+pi.getPid()+" PPID:"+pi.getPPid());
+       }
+	     if ( pi.getChildren().size() > 0 ) {
+	        if ( logger == null ) {
+	          System.out.println("\t=>");
+	        } else {
+	          logger.trace("dump",null,"\t=>");
+	        }
+	        for(NodeUsersCollector.ProcessInfo childpi : pi.getChildren() ) {
+	          if ( logger == null ) {
+	            System.out.println("PID:"+childpi.getPid()+" PPID:"+childpi.getPPid()+" | ");
+	          } else {
+	            logger.trace("dump",null,"PID:"+childpi.getPid()+" PPID:"+childpi.getPPid()+" | ");
+	          }
+	        }
+	        if ( logger == null ) {
+	          System.out.println("\n");
+
+	        } else {
+	          logger.trace("dump",null,"\n");
+	          
+	        }
+	      } 
+	   }
+	}
+	public static void main(String[] args) {
+/*
+	  try {
+	    Set<NodeUsersCollector.ProcessInfo> processList = new HashSet<NodeUsersCollector.ProcessInfo>();
+	    
+	    NodeUsersCollector.ProcessInfo pi1 = new NodeUsersCollector.ProcessInfo(102,100);
+      NodeUsersCollector.ProcessInfo pi2 = new NodeUsersCollector.ProcessInfo(103,110);
+      NodeUsersCollector.ProcessInfo pi11 = new NodeUsersCollector.ProcessInfo(104,102);
+      NodeUsersCollector.ProcessInfo pi12 = new NodeUsersCollector.ProcessInfo(105,102);
+      NodeUsersCollector.ProcessInfo pi3 = new NodeUsersCollector.ProcessInfo(106,111);
+      
+      NodeUsersCollector collector = new NodeUsersCollector(null);
+      collector.aggregate(processList, pi1);
+//      collector.dump(processList);
+      collector.aggregate(processList, pi2);
+      collector.aggregate(processList, pi11);
+      collector.aggregate(processList, pi12);
+      collector.aggregate(processList, pi3);
+      collector.dump(processList);
+      
+	  } catch( Exception e) {
+	    e.printStackTrace();
+	  }
+	  */
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeUsersCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.cpu.DuccProcessCpuUsage;
+import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+public class ProcessCpuUsageCollector extends AbstractMetricCollector implements
+		Callable<ProcessCpuUsage> {
+	private DuccLogger logger;
+	private String pid;
+
+	public ProcessCpuUsageCollector(DuccLogger logger, String pid,
+			RandomAccessFile fileHandle, int howMany, int offset) {
+		super(fileHandle, howMany, offset);
+		this.logger = logger;
+		this.pid = pid;
+	}
+
+	public ProcessCpuUsage call() throws Exception {
+		try {
+			super.parseMetricFile();
+			return new DuccProcessCpuUsage(super.metricFileContents,
+			     super.metricFieldOffsets, super.metricFieldLengths);
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
+
+	private String execTopShell() throws Exception {
+		List<String> command = new ArrayList<String>();
+		command.add("top");
+		command.add("-b");
+		command.add("-n");
+		command.add("1");
+		command.add("-p");
+		command.add(pid);
+
+		ProcessBuilder builder = new ProcessBuilder(command);
+		Process process = builder.start();
+		InputStream is = process.getInputStream();
+		InputStreamReader isr = new InputStreamReader(is);
+		BufferedReader br = new BufferedReader(isr);
+		String line;
+		int count = 0;
+		String cpu = "";
+		try {
+			while ((line = br.readLine()) != null) {
+				if (count == 7) {
+					String[] values = line.trim().split("\\s+");
+					cpu = values[9];
+					process.destroy();
+					break;
+				}
+				count++;
+			}
+		} finally {
+			if (is != null) {
+				is.close();
+			}
+		}
+		process.waitFor();
+		return cpu;
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.metrics.collectors;
+
+import java.io.RandomAccessFile;
+import java.util.concurrent.Callable;
+
+import org.apache.uima.ducc.common.agent.metrics.memory.DuccProcessResidentMemory;
+import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
+
+
+public class ProcessResidentMemoryCollector extends AbstractMetricCollector
+		implements Callable<ProcessResidentMemory> {
+	public ProcessResidentMemoryCollector(RandomAccessFile fileHandle,
+			int howMany, int offset) {
+		super(fileHandle, howMany, offset);
+	}
+
+	public ProcessResidentMemory call() throws Exception {
+		super.parseMetricFile();
+		return new DuccProcessResidentMemory(super.metricFileContents,
+				super.metricFieldOffsets, super.metricFieldLengths);
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.monitor;
+
+import java.util.Map;
+
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.ProcessReaperTask;
+import org.apache.uima.ducc.common.ANodeStability;
+import org.apache.uima.ducc.common.Node;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+
+public class AgentMonitor extends ANodeStability{
+  private NodeAgent agent;
+  private ProcessReaperTask reaperTask;
+  DuccLogger logger;
+  public AgentMonitor(NodeAgent agent, DuccLogger logger, int nodeStability, int agentMetricsRate) {
+    super(nodeStability, agentMetricsRate);
+    this.agent = agent;
+    this.logger = logger;
+    reaperTask = new ProcessReaperTask(agent,logger);
+  }
+
+  public void nodeDeath(Map<Node, Node> nodes) {
+    logger.warn("AgentMonitor.nodeDeath", null,"Agent detected a network/borker problem. Proceeding to shutdown JPs");
+    Thread t = new Thread(reaperTask);
+    t.setDaemon(true);
+    t.start();
+  }
+
+  public void missedNode(Node n, int c) {
+    logger.info("missedNode",null,"Agent missed a ping ("+c+")");
+  }
+
+  public void ping(Node node) {
+    super.nodeArrives(node);
+  }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/RogueProcessDetector.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/RogueProcessDetector.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/RogueProcessDetector.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/RogueProcessDetector.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.monitor;
+
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.uima.ducc.agent.Agent;
+import org.apache.uima.ducc.agent.RogueProcessReaper;
+import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector;
+import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector.ProcessInfo;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccUserReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+public class RogueProcessDetector extends AbstractDuccComponent implements Processor{
+  public static final String DUCC_PROPERTY_FILE="ducc.deploy.configuration";
+  public static final String AGENT_NODE_INVENTORY_ENDPOINT="ducc.agent.node.inventory.endpoint";
+  //  locks agent inventory 
+  public static Object lock = new Object();
+  //  processes managed by the agent
+  List<ManagedProcess> agentProcesses = new ArrayList<ManagedProcess>();
+  //  processes not managed by the agent, added to the inventory for testing via -p option
+  List<ManagedProcess> testProcesses = new ArrayList<ManagedProcess>();
+  
+  private static ActiveMQComponent duccAMQComponent = null;
+
+  List<DuccUserReservation> reservations = new ArrayList<DuccUserReservation>();
+  //  Executor which runs detection of rogue processes at fixed intervals
+  ScheduledThreadPoolExecutor executor = 
+          new ScheduledThreadPoolExecutor(5);
+  
+  String nodeToMonitor = "";
+  
+  public RogueProcessDetector(List<DuccUserReservation> reservations) {
+    super("rpd", new DefaultCamelContext());
+    this.reservations = reservations;
+  }
+  /**
+   * Route builder to receive node agent inventory updates
+   * @param delegate - processor where the node inventory is collected and filtered
+   * @param endpoint - agent inventory topic
+   * @return
+   */
+  public RouteBuilder routeBuilderForEndpoint(final Processor delegate, final String endpoint) {
+    return new RouteBuilder() {
+      public void configure() {
+        onException(Exception.class).handled(true).process(new ErrorProcessor());
+        from(endpoint)
+        .process(delegate);
+      }
+    };
+  }
+  public void start( long delay, String nodeName, String[] pids) {
+    try {
+      Agent agent = new TestAgent(agentProcesses);
+      for (String pid : pids) {
+        testProcesses.add( new ManagedProcess(pid));
+      }
+      agentProcesses.addAll(testProcesses);
+      nodeToMonitor = nodeName;
+      
+      loadProperties(DUCC_PROPERTY_FILE);
+
+      String brokerUrl = System.getProperty("ducc.broker.url");
+      
+      duccAMQComponent = ActiveMQComponent.activeMQComponent(brokerUrl);
+      //  by default, Camel includes ActiveMQComponent connected to localhost. We require
+      //  an external broker. So replace the default with one connected to the desired broker.
+      if ( super.getContext().getComponent("activemq") != null) {
+        super.getContext().removeComponent("activemq");
+      }
+      super.getContext().addComponent("activemq",duccAMQComponent);
+
+      String agentNodeInventoryEndpoint = 
+              System.getProperty(AGENT_NODE_INVENTORY_ENDPOINT);
+      System.out.println("Starting Simulator: broker="+brokerUrl+" node="+nodeName+" delay="+delay+" agent inventory="+agentNodeInventoryEndpoint);
+      super.getContext().addRoutes(this.routeBuilderForEndpoint(this, agentNodeInventoryEndpoint));
+      super.getContext().start();
+      //  Schedule rogue process detection at a given interval
+      final NodeUsersCollector nodeUsersCollector = new NodeUsersCollector(agent, null);
+      executor.scheduleAtFixedRate(new Runnable() {
+        public void run() {
+            try {
+              TreeMap<String,NodeUsersInfo> userInfo = 
+                      nodeUsersCollector.call();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+      }, 0, delay, TimeUnit.SECONDS);
+    } catch( Exception e) {
+      e.printStackTrace();
+    }
+    
+  }
+  /**
+   * Dummy Agent
+   *
+   */
+  class TestAgent implements Agent {
+    public DuccLogger logger = DuccLogger.getLogger(this.getClass(), COMPONENT_NAME);
+
+    List<ManagedProcess> deployedProcesses;
+    private RogueProcessReaper rogueProcessReaper = 
+            new RogueProcessReaper(null, 5, 10);
+    
+    public TestAgent(List<ManagedProcess> deployedProcesses) {
+      this.deployedProcesses = deployedProcesses;
+    }
+    public void startProcess(IDuccProcess process, ICommandLine commandLine,
+            IDuccStandardInfo info, DuccId workDuccId, long shareMemorySize) {
+    }
+
+    public void stopProcess(IDuccProcess process) {
+    }
+
+    public NodeIdentity getIdentity() {
+      return null;
+    }
+
+    public HashMap<DuccId, IDuccProcess> getInventoryCopy() {
+      return null;
+    }
+
+    public HashMap<DuccId, IDuccProcess> getInventoryRef() {
+      return null;
+    }
+    public boolean isManagedProcess(Set<NodeUsersCollector.ProcessInfo> processList, NodeUsersCollector.ProcessInfo cpi) {
+      try {
+        
+        synchronized(lock) {
+          for (ManagedProcess deployedProcess : deployedProcesses) {
+            if ( deployedProcess != null ) {
+              // Check if process has been deployed but has not yet reported its PID. 
+              // This is normal. It takes a bit of time until the JP reports
+              // its PID to the Agent. If there is at least one process in Agent
+              // deploy list with no PID we assume it is the one. 
+              if ( deployedProcess.getPid() == null ) {
+                return true; 
+              }
+              String dppid = deployedProcess.getPid();
+              if (dppid != null && dppid.equals(String.valueOf(cpi.getPid()))) {
+                return true;
+              }
+            }
+          }
+          for( NodeUsersCollector.ProcessInfo pi: processList ) {
+            if ( pi.getPid() == cpi.getPPid() && pi.getChildren().size() > 0 ) { // is the current process a child of another java Process? 
+               return isManagedProcess(pi.getChildren(), pi);
+            }
+          }
+        }
+      } catch ( Exception e) {
+        e.printStackTrace();
+      } 
+      return false;
+    }
+
+    public boolean isRogueProcess(String uid, Set<ProcessInfo> processList, ProcessInfo cpi)
+            throws Exception {
+        // Agent adds a process to its inventory before launching it. So it is 
+        // possible that the inventory contains a process with no PID. If there
+        // is such process in the inventory we cannot determine that a given 
+        // pid is rogue yet. Eventually, the launched process reports its PID
+        boolean foundDeployedProcessWithNoPID = false;
+        try {
+          synchronized(lock) {
+            for (ManagedProcess deployedProcess : deployedProcesses) {
+              // Check if process has been deployed but has not yet reported its PID. 
+              // This is normal. It takes a bit of time until the JP reports
+              // its PID to the Agent. If there is at least one process in Agent
+              // deploy list with no PID we assume it is the one. 
+              if ( deployedProcess.getPid() == null ) {
+                foundDeployedProcessWithNoPID = true; 
+                break;
+              } else {
+                String dppid = deployedProcess.getPid();
+                if (dppid != null && dppid.equals(String.valueOf(cpi.getPid()))) {
+                  System.out.println("++++++++++++++++ PID:"+cpi.getPid() +" Is *NOT* Rogue");
+                  return false;
+                }
+              }
+            }
+          }
+          
+        } catch( Exception e ) {
+          e.printStackTrace();
+        }
+        // not found
+        if ( foundDeployedProcessWithNoPID ) {
+          return false;
+        } else {
+          return isParentProcessRogue(processList, cpi);
+        }
+    }
+    private boolean isParentProcessRogue(Set<NodeUsersCollector.ProcessInfo> processList, NodeUsersCollector.ProcessInfo cpi) {
+      //boolean found = false;
+      for( NodeUsersCollector.ProcessInfo pi: processList ) {
+        if ( pi.getPid() == cpi.getPPid() ) { // is the current process a child of another java Process? 
+          //found = true; 
+          if ( pi.isRogue() ) { // if parent is rogue, a child is rogue as well
+            System.out.println("++++++++++++++++ parent PID:"+pi.getPid() +" Is Rogue");
+            return true;
+          }
+          System.out.println("++++++++++++++++ parent PID:"+pi.getPid() +" Is NOT Rogue");
+          return false;
+        } else {
+          if ( pi.getChildren().size() > 0 ) {
+            return isParentProcessRogue(pi.getChildren(), cpi);
+          }
+        } 
+      }
+      System.out.println("++++++++++++++++ PID:"+cpi.getPid() +" Is Rogue");
+
+      return true;
+
+    }
+    public void copyAllUserReservations(TreeMap<String, NodeUsersInfo> map) {
+      try {
+        if ( reservations != null ) {
+//          System.out.println("+++++++++++ Copying User Reservations - List Size:"+reservations.size());
+          for( DuccUserReservation r : reservations ) {
+            if ( "System".equals(r.getUserId())) {
+              continue;
+            }
+            NodeUsersInfo nui = null;
+            if ( map.containsKey(r.getUserId())) {
+              nui = map.get(r.getUserId());
+            } else {
+              nui = new NodeUsersInfo(r.getUserId());
+              map.put(r.getUserId(), nui);
+            }
+            nui.addReservation(r.getReserveID());
+          }
+        } else {
+//          System.out.println(" ***********  No Reservations");
+        }
+      } catch( Exception e) {
+        e.printStackTrace();
+      } 
+    }
+
+    public RogueProcessReaper getRogueProcessReaper() {
+      return rogueProcessReaper;
+    }
+    
+  }
+  
+  public static class ManagedProcess {
+    String pid;
+
+    public ManagedProcess(String pid) {
+      this.pid = pid;
+    }
+
+    public String getPid() {
+      return pid;
+    }
+
+    public void setPid(String pid) {
+      this.pid = pid;
+    }
+
+  }
+  
+  
+  public static void main(String[] args) {
+    
+    CommandLineParser parser = new BasicParser( );
+    Options options = new Options( );
+    options.addOption("h", "help", false, "Print this usage information");
+    options.addOption( new Option("d", true,"") );
+    options.addOption( new Option("r", true,"") );
+    options.addOption( new Option("n", true,"") );
+    options.addOption( new Option("p", true,"") );
+    // Parse the program arguments
+    CommandLine commandLine = null;    
+    try {
+      commandLine = parser.parse( options, args );
+    } catch( Exception e) {
+      e.printStackTrace();
+      System.exit(1);
+    }
+    
+    long delay = 10;
+    String[] pids = new String[0];
+    String nodeName = null;
+
+    
+    List<DuccUserReservation> reservations = new ArrayList<DuccUserReservation>();
+    if (commandLine.hasOption("r")) {
+      try {
+        String[] users = ((String)commandLine.getOptionValue("r")).split(",");
+        int dummyReservationId = 11111;
+        for( String userId : users ) {
+          DuccUserReservation reservation = 
+                  new DuccUserReservation(userId, new DuccId(dummyReservationId++), null);
+          reservations.add(reservation);
+        }
+      } catch( Exception e) {
+        e.printStackTrace();
+        System.exit(1);
+      }
+    }
+
+    if (commandLine.hasOption("p")) {
+      try {
+        pids = ((String)commandLine.getOptionValue("p")).split(",");
+      } catch( Exception e) {
+        e.printStackTrace();
+        System.exit(1);
+      }
+    }
+
+    if (commandLine.hasOption("d")) {
+      try {
+        delay = Long.parseLong((String)commandLine.getOptionValue("d"));
+      } catch( Exception e) {
+        e.printStackTrace();
+        System.exit(1);
+      }
+    }
+
+    if (commandLine.hasOption("n")) {
+      try {
+        nodeName = (String)commandLine.getOptionValue("n");
+      } catch( Exception e) {
+        e.printStackTrace();
+        System.exit(1);
+      }
+    } else {
+      try {
+        nodeName = InetAddress.getLocalHost().getHostName();
+      } catch( Exception e) {
+        e.printStackTrace();
+        System.exit(1);
+
+      }
+    }
+
+    if (commandLine.hasOption("h")) {
+      System.out.println("Usage: java -cp <..> -p <pid1,pid2,..> -n <node name> -d <delay> -r <user1,user2,...>");
+      System.out.println("Options:");
+      System.out.println("-p: comma separated list of pids as if the processes where managed by an agent");
+      System.out.println("-n: node name ");
+      System.out.println("-d: delay at which to collect processes");
+      System.out.println("-r: comma separated list of user ids owning a reservation on a node");
+      System.exit(1);
+    }
+    RogueProcessDetector detector = 
+            new RogueProcessDetector(reservations);
+
+    
+    detector.start( delay, nodeName, pids);
+
+  }
+  public void process(Exchange arg0) throws Exception {
+    if ( arg0.getIn().getBody() instanceof NodeInventoryUpdateDuccEvent ) {
+      NodeInventoryUpdateDuccEvent event =
+              (NodeInventoryUpdateDuccEvent)arg0.getIn().getBody();
+      HashMap<DuccId, IDuccProcess> processes = event.getProcesses();
+      try {
+        boolean clear = true;
+        synchronized(lock) {
+          for( Map.Entry<DuccId, IDuccProcess> process : processes.entrySet()) {
+            if ( process.getValue().getNodeIdentity().getName().equals(nodeToMonitor) ) {
+              if ( clear ) {
+                clear = false;
+                agentProcesses.clear(); // remove old inventory and refresh from new agent inventory
+                agentProcesses.addAll(testProcesses); // copy test processes
+              }
+              System.out.println("---------- Got Node Process - PID="+process.getValue().getPID());
+              agentProcesses.add(new ManagedProcess(process.getValue().getPID()));
+            }
+          }
+        }
+      } catch( Exception e ) {
+        e.printStackTrace();
+      }
+    }
+  }
+  class DebugProcessor implements Processor {
+
+    public void process(Exchange arg0) throws Exception {
+      
+    }
+    
+  }
+  class ErrorProcessor implements Processor {
+
+    public void process(Exchange exchange) throws Exception {
+      Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+      caused.printStackTrace();
+    }
+    
+  }
+  
+  
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/monitor/RogueProcessDetector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/BaseProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/BaseProcessor.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/BaseProcessor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/BaseProcessor.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.processors;
+
+public abstract class BaseProcessor {
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/BaseProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java?rev=1427906&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java Wed Jan  2 19:02:18 2013
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.agent.processors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.common.IDuccProcess;
+import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
+
+/**
+ * 
+ * 
+ */
+public class DefaultNodeInventoryProcessor implements NodeInventoryProcessor {
+	DuccLogger logger = new DuccLogger(this.getClass(), "AGENT");
+	boolean inventoryChanged=true;
+	private NodeAgent agent;
+	private HashMap<DuccId, IDuccProcess> previousInventory;	
+	private int forceInventoryUpdateMaxThreshold=1;
+	private long counter=0;
+	
+	public  DefaultNodeInventoryProcessor(NodeAgent agent,String inventoryPublishRateSkipCount) {
+		this.agent = agent;
+		try {
+			forceInventoryUpdateMaxThreshold = Integer.parseInt(inventoryPublishRateSkipCount);
+		} catch( Exception e) {
+		}
+		//  Dont allow 0
+		if ( forceInventoryUpdateMaxThreshold == 0 ) {
+		  forceInventoryUpdateMaxThreshold = 1;
+		}
+	}
+	/**
+	 * Get a copy of agent {@code Process} inventory
+	 */
+	public HashMap<DuccId, IDuccProcess> getInventory() {
+		return agent.getInventoryCopy();
+	}
+	/**
+	 * 
+	 */
+	public void process(Exchange outgoingMessage) throws Exception {
+		String methodName="process";
+		//	Get a deep copy of agent's inventory
+		HashMap<DuccId, IDuccProcess> inventory = getInventory();
+		// Determine if the inventory changed since the last publishing was done
+		// First check if the inventory expanded or shrunk. If the same in size, 
+		// compare process states and PID. If either of the two changed for any
+		// of the processes trigger immediate publish. If no changes found, publish
+		// according to skip counter (ducc.agent.node.inventory.publish.rate.skip)
+		// configured in ducc.properties.
+		if ( previousInventory != null ) {
+			if (inventory.size() != previousInventory.size()) {
+				inventoryChanged = true;
+			} else {
+				// Inventory maps are equal in size, check if all processes in the current
+				// inventory exist in the previous inventory snapshot. If not, it means that
+				// that perhaps a new process was added and one was removed. In this case,
+				// force the publish, since there was a change.
+				for( Map.Entry<DuccId, IDuccProcess> currentProcess: inventory.entrySet()) {
+					//	Check if a process in the current inventory exists in a previous 
+					//  inventory snapshot
+					if ( previousInventory.containsKey(currentProcess.getKey())) {
+						IDuccProcess previousProcess = 
+								previousInventory.get(currentProcess.getKey());
+						//	check if either PID or process state has changed
+						if ( currentProcess.getValue().getPID() != null && 
+								previousProcess.getPID() == null) {
+							inventoryChanged = true;
+							break;
+						} else if ( !currentProcess.getValue().getProcessState().equals(previousProcess.getProcessState())) {
+							inventoryChanged = true;
+							break;
+						}
+					} else {
+						// New inventory contains a process not in the previous snapshot
+						inventoryChanged = true;
+						break;
+					}
+				}
+			}
+		}
+		//	Get this inventory snapshot
+		previousInventory = inventory;
+		//	Broadcast inventory if there is a change or configured number of epochs
+		//  passed since the last broadcast. This is configured in ducc.properties with
+		//  property ducc.agent.node.inventory.publish.rate.skip
+		try {
+			if ( inventory.size() > 0 && (inventoryChanged || ( counter > 0 && (counter % forceInventoryUpdateMaxThreshold ) == 0)) )  {
+				
+				outgoingMessage.getIn().setBody(new NodeInventoryUpdateDuccEvent(inventory));
+				StringBuffer sb = new StringBuffer("Node Inventory ("+inventory.size()+")");
+				for( Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
+	        int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null ) ? 0 : 
+	          p.getValue().getUimaPipelineComponents().size();
+					sb.append("\n\t[Process Type=").
+					   append(p.getValue().getProcessType()).
+					   append(" DUCC ID=").
+					   append(p.getValue().getDuccId()).
+					   append(" PID=").
+					   append(p.getValue().getPID()).
+					   append(" State=").append(p.getValue().getProcessState()).
+					   append(" Resident Memory=").append(p.getValue().getResidentMemory()).
+					   append(" Init Stats List Size:"+pipelineInitStats).
+					   append("] ");
+          if ( p.getValue().getProcessState().equals(ProcessState.Stopped) ||
+                  p.getValue().getProcessState().equals(ProcessState.Failed) ||
+                  p.getValue().getProcessState().equals(ProcessState.Killed)) {
+              sb.append(" Reason:"+p.getValue().getReasonForStoppingProcess());
+             }
+				}
+				logger.info(methodName, null, "Agent "+agent.getIdentity().getName()+" Posting Inventory:"+sb.toString());
+			} else {
+			  // Add null to the body of the message. A filter 
+			  // defined in the Camel route (AgentConfiguration.java)
+			  // has a predicate to check for null body and throws
+			  // away such a message.
+	       outgoingMessage.getIn().setBody(null);
+			}
+		} catch( Exception e) {
+			logger.error(methodName, null, e);
+		} finally {
+			if ( inventoryChanged ) {
+				counter = 0;
+			} else {
+				counter++;
+			}
+			inventoryChanged = false;
+		}
+	}
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native