You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/06/06 11:50:44 UTC

svn commit: r1600836 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator: OrchestratorState.java config/OrchestratorConfiguration.java

Author: degenaro
Date: Fri Jun  6 09:50:44 2014
New Revision: 1600836

URL: http://svn.apache.org/r1600836
Log:
UIMA-3857 DUCC Orchestrator (OR) should checkpoint and restore publication sequence number

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java   (with props)
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java?rev=1600836&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java Fri Jun  6 09:50:44 2014
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+public class OrchestratorState {
+
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(OrchestratorState.class.getName());
+	
+	private static OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private static String fileName = orchestratorCommonArea.getStateDirectory()+File.separator+"orchestrator-state.json";
+	
+	private static Gson gson = new Gson();
+	private static OrchestratorState instance = null;
+	private static DuccId jobid = null;
+	
+	private long sequenceNumberState = -1;
+	private long sequenceNumberStateAbbreviated = -1;
+	
+	public static OrchestratorState getInstance() {
+		String location = "getInstance";
+		synchronized(OrchestratorState.class) {
+			logger.debug(location, jobid, ""+instance);
+			if(instance == null) {
+				instance = new OrchestratorState();
+				instance.initialize();
+			}
+		}
+		return instance;
+	}
+	
+	private void initialize() {
+		String location = "initialize";
+		logger.debug(location, jobid, ""+instance);
+		importState();
+	}
+	
+	public long getNextSequenceNumberState() {
+		String location = "getNextSequenceNumberState";
+		synchronized(this) {
+			sequenceNumberState++;
+			exportState();
+			logger.debug(location, jobid, ""+sequenceNumberState);
+			return sequenceNumberState;
+		}
+	}
+	
+	public void setNextSequenceNumberState(long value) {
+		String location = "setNextSequenceNumberState";
+		synchronized(this) {
+			sequenceNumberState = value;
+			exportState();
+			logger.debug(location, jobid, ""+sequenceNumberState);
+		}
+	}
+	
+	public long getNextSequenceNumberStateAbbreviated() {
+		String location = "getNextSequenceNumberStateAbbreviated";
+		synchronized(this) {
+			sequenceNumberStateAbbreviated++;
+			exportState();
+			logger.debug(location, jobid, ""+sequenceNumberStateAbbreviated);
+			return sequenceNumberStateAbbreviated;
+		}
+	}
+	
+	public void setNextSequenceNumberStateAbbreviated(long value) {
+		String location = "setNextSequenceNumberStateAbbreviated";
+		synchronized(this) {
+			sequenceNumberStateAbbreviated = value;
+			exportState();
+			logger.debug(location, jobid, ""+sequenceNumberStateAbbreviated);
+		}
+	}
+	
+	private void copy(OrchestratorState importedState) {
+		String location = "copy";
+		if(importedState != null) {
+			setNextSequenceNumberState(importedState.getNextSequenceNumberState());
+			setNextSequenceNumberStateAbbreviated(importedState.getNextSequenceNumberStateAbbreviated());
+		}
+		else {
+			logger.warn(location, jobid, "no previous state found");
+		}
+	}
+	
+	private void importState() {
+		String location = "importState";
+		try {
+			importer();
+		}
+		catch(Exception e) {
+			logger.warn(location, jobid, e);
+		}
+	}
+	
+	private void importer() throws IOException {
+		String location = "importer";
+		FileReader fr = null;
+		BufferedReader br = null;
+		try {
+			logger.debug(location, jobid, fileName);
+			fr = new FileReader(fileName);
+			br = new BufferedReader(fr);
+			Type typeOfMap = new TypeToken<OrchestratorState>() { }.getType();
+			OrchestratorState importedState = gson.fromJson(br, typeOfMap);
+			br.close();
+			fr.close();
+			copy(importedState);
+		}
+		finally {
+			if(br != null) {
+				br.close();
+			}
+			if(fr != null) {
+				fr.close();
+			}
+		}
+	}
+	
+	private void exportState() {
+		String location = "exportState";
+		try {
+			exporter();
+		}
+		catch(Exception e) {
+			logger.error(location, jobid, e);
+		}
+	}
+	
+	private void exporter() throws IOException {
+		String location = "exporter";
+		FileWriter fw = null;
+		try {
+			logger.debug(location, jobid, fileName);
+			String json = gson.toJson(this);
+			fw = new FileWriter(fileName);
+			fw.write(json);
+			fw.close();
+		}
+		finally {
+			if(fw != null) {
+				fw.close();
+			}
+		}
+	}
+	
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/OrchestratorState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java?rev=1600836&r1=1600835&r2=1600836&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java Fri Jun  6 09:50:44 2014
@@ -18,8 +18,6 @@
 */
 package org.apache.uima.ducc.orchestrator.config;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.camel.Body;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -33,9 +31,11 @@ import org.apache.uima.ducc.common.excep
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
 import org.apache.uima.ducc.common.utils.XStreamUtils;
+import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.orchestrator.Orchestrator;
 import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
 import org.apache.uima.ducc.orchestrator.OrchestratorComponent;
+import org.apache.uima.ducc.orchestrator.OrchestratorState;
 import org.apache.uima.ducc.orchestrator.event.OrchestratorEventListener;
 import org.apache.uima.ducc.transport.DuccTransportConfiguration;
 import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
@@ -68,6 +68,7 @@ public class OrchestratorConfiguration {
 	@Autowired DuccTransportConfiguration orchestratorTransport;
 
 	private DuccLogger duccLogger = DuccLoggerComponents.getOrLogger(OrchestratorConfiguration.class.getName());
+	private DuccId jobid = null;
 	
 	/**
 	 * Creates Camel router that will handle incoming request messages. Each message will
@@ -235,18 +236,20 @@ public class OrchestratorConfiguration {
 	 */
 	private class OrchestratorStateProcessor implements Processor {
 		private Orchestrator orchestrator;
-		private   AtomicLong sequence = new AtomicLong();
 		
 		private OrchestratorStateProcessor(Orchestrator orchestrator) {
 			this.orchestrator = orchestrator;
 		}
 		public void process(Exchange exchange) throws Exception {
+			String location = "OrchestratorStateProcessor.process";
 			// Fetch new state from Orchestrator
 			OrchestratorStateDuccEvent jse = orchestrator.getState();
 			//	add sequence number to the outgoing message. This should be used to manage
 			//  processing order in the consumer
-			jse.setSequence(sequence.addAndGet(1));
-
+			OrchestratorState orchestratorState = OrchestratorState.getInstance();
+			long seqNo = orchestratorState.getNextSequenceNumberState();
+			duccLogger.debug(location, jobid, ""+seqNo);
+			jse.setSequence(seqNo);
 			//	Add the state object to the Message
 			exchange.getIn().setBody(jse);
 		}
@@ -290,17 +293,20 @@ public class OrchestratorConfiguration {
 	 */
 	private class OrchestratorAbbreviatedStateProcessor implements Processor {
 		private Orchestrator orchestrator;
-		private   AtomicLong sequence = new AtomicLong();
 		
 		private OrchestratorAbbreviatedStateProcessor(Orchestrator orchestrator) {
 			this.orchestrator = orchestrator;
 		}
 		public void process(Exchange exchange) throws Exception {
+			String location = "OrchestratorAbbreviatedStateProcessor.process";
 			// Fetch new state from Orchestrator
 			OrchestratorAbbreviatedStateDuccEvent jse = orchestrator.getAbbreviatedState();
 			//	add sequence number to the outgoing message. This should be used to manage
 			//  processing order in the consumer
-			jse.setSequence(sequence.addAndGet(1));
+			OrchestratorState orchestratorState = OrchestratorState.getInstance();
+			long seqNo = orchestratorState.getNextSequenceNumberStateAbbreviated();
+			duccLogger.debug(location, jobid, ""+seqNo);
+			jse.setSequence(seqNo);
 			//	Add the state object to the Message
 			exchange.getIn().setBody(jse);
 		}