You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2016/10/20 11:48:52 UTC

svn commit: r1765811 - in /uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator: OrchestratorComponent.java OrchestratorState.java

Author: degenaro
Date: Thu Oct 20 11:48:52 2016
New Revision: 1765811

URL: http://svn.apache.org/viewvc?rev=1765811&view=rev
Log:
UIMA-5060 DUCC Orchestrator (OR) "warm" restart issues

- use agent's last seen OR publication number if it is greater

Modified:
    uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java
    uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java

Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java?rev=1765811&r1=1765810&r2=1765811&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java Thu Oct 20 11:48:52 2016
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.camel.CamelContext;
+import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
 import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
 import org.apache.uima.ducc.common.component.AbstractDuccComponent;
@@ -424,8 +425,21 @@ implements Orchestrator {
 		HashMap<DuccId, IDuccProcess> processMap = duccEvent.getProcesses();
 		stateManager.reconcileState(processMap);
 		NodeAccounting.getInstance().heartbeat(processMap);
+		adjustPublicationSequenceNumber(duccEvent);
 		logger.trace(methodName, null, messages.fetch("exit"));
 	}
+	
+	/*
+	 * If an Agent has seen a higher sequence number than that currently
+	 * published by Orchestrator, then use the higher value.  The likely
+	 * cause is the loss of access to the orchestrator-state.properties file.
+	 */
+	private void adjustPublicationSequenceNumber(NodeInventoryUpdateDuccEvent duccEvent) {
+		NodeIdentity nodeIdentity = duccEvent.getNodeIdentity();
+		long seqNo = duccEvent.getSequence();
+		OrchestratorState.getInstance().setNextSequenceNumberStateIfGreater(nodeIdentity, seqNo);
+	}
+	
 	/**
 	 * Publish Orchestrator State
 	 */

Modified: uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java?rev=1765811&r1=1765810&r2=1765811&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java Thu Oct 20 11:48:52 2016
@@ -24,7 +24,9 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
 import org.apache.uima.ducc.common.utils.id.DuccId;
@@ -43,7 +45,7 @@ public class OrchestratorState {
 	private static OrchestratorState instance = null;
 	private static DuccId jobid = null;
 	
-	private long sequenceNumberState = -1;
+	private AtomicLong sequenceNumberState = new AtomicLong(-1);
 	
 	public static OrchestratorState getInstance() {
 		String location = "getInstance";
@@ -66,19 +68,37 @@ public class OrchestratorState {
 	public long getNextSequenceNumberState() {
 		String location = "getNextSequenceNumberState";
 		synchronized(this) {
-			sequenceNumberState++;
+			long value = sequenceNumberState.incrementAndGet();
 			exportState();
 			logger.debug(location, jobid, ""+sequenceNumberState);
-			return sequenceNumberState;
+			return value;
 		}
 	}
 	
 	public void setNextSequenceNumberState(long value) {
 		String location = "setNextSequenceNumberState";
 		synchronized(this) {
-			sequenceNumberState = value;
+			sequenceNumberState.set(value);
 			exportState();
-			logger.debug(location, jobid, ""+sequenceNumberState);
+			logger.debug(location, jobid, ""+value);
+		}
+	}
+	
+	public void setNextSequenceNumberStateIfGreater(NodeIdentity nodeIdentity, long value) {
+		String location = "setNextSequenceNumberStateIfGreater";
+		synchronized(this) {
+			String node = "?";
+			if(nodeIdentity != null) {
+				node = nodeIdentity.getName();
+			}
+			long currentValue = sequenceNumberState.get();
+			if(value > currentValue) {
+				setNextSequenceNumberState(value);
+				logger.warn(location, jobid, "agent:"+node+" "+"value:"+value+" "+"or:"+currentValue);
+			}
+			else {
+				logger.trace(location, jobid, "agent:"+node+" "+"value:"+value+" "+"or:"+currentValue);
+			}
 		}
 	}