You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2019/07/19 15:02:09 UTC

svn commit: r1863406 - /uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java

Author: degenaro
Date: Fri Jul 19 15:02:08 2019
New Revision: 1863406

URL: http://svn.apache.org/viewvc?rev=1863406&view=rev
Log:
UIMA-6098 DUCC reliability Agent should ignore any non-master OR publication

Modified:
    uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java

Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java?rev=1863406&r1=1863405&r2=1863406&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/event/AgentEventListener.java Fri Jul 19 15:02:08 2019
@@ -18,10 +18,10 @@
 */
 package org.apache.uima.ducc.agent.event;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.Body;
@@ -41,6 +41,7 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.ProcessStartDuccEvent;
 import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
 import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent;
+import org.apache.uima.ducc.transport.event.common.DuccUserReservation;
 import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
 import org.apache.uima.ducc.transport.event.common.IDuccJobDeployment;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
@@ -237,8 +238,6 @@ public class AgentEventListener implemen
 		}
 	}
 	
-	private Map<String,String> map = new ConcurrentHashMap<String,String>();
-	
 	/**
 	 * This method is called by Camel when PM sends DUCC state to agent's queue. It 
 	 * takes responsibility of reconciling processes on this node. 
@@ -257,28 +256,34 @@ public class AgentEventListener implemen
 				
 				String host = duccEvent.getProducerHost();
 				DuccHeadState dhs = duccEvent.getDuccHeadState();
+				int jobs = 0;
+				List<IDuccJobDeployment> listJobs = duccEvent.getJobList();
+				if(listJobs != null) {
+					jobs = listJobs.size();
+				}
+				int reservations = 0;
+				List<DuccUserReservation> listReservations = duccEvent.getUserReservations();
+				if(listReservations != null) {
+					reservations = listReservations.size();
+				}
+				long tid = Thread.currentThread().getId();
+				String message = "sequence="+sequence+" "+"type="+dhs+" "+"producer="+host+" "+"jobs="+jobs+" "+"reservations="+reservations+" "+"tid="+tid;
 				switch(dhs) {
-				case backup:
-					if(!map.containsKey(host)) {
-						map.put(host, host);
-						logger.warn(location, jobid, "suspended"+" "+"host:"+host);
-					}
-					return;
 				case master:
-					if(map.containsKey(host)) {
-						map.remove(host);
-						logger.warn(location, jobid, "resumed"+" "+"host:"+host);
-					}
+					// Issue info and process master type publication
+					logger.info(location, null, "accept=Yes"+" "+message);
 					break;
 				default:
-					break;
+					// Issue warning and ignore non-master type publication
+					logger.warn(location, null, "accept=No"+" "+message);
+					return;
 				}
 				
 				// check for out of band messages. Expecting a message with a
 				// sequence number larger than the previous message.
 				if (sequence > lastSequence.get()) {
 					lastSequence.set(sequence);
-					logger.info("reportIncomingStateForThisNode", null,
+					logger.debug("reportIncomingStateForThisNode", null,
 							"Received OR Sequence:" + sequence + " Thread ID:"
 									+ Thread.currentThread().getId());
 				} else {