You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/04/01 21:33:22 UTC

svn commit: r1670759 - /uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/timeout/TimeoutManager.java

Author: degenaro
Date: Wed Apr  1 19:33:22 2015
New Revision: 1670759

URL: http://svn.apache.org/r1670759
Log:
UIMA-4318 DUCC Web Server (WS) reduce JD storage use

- use WI as key to timeout maps
- setRemoveOnCancelPolicy(true)

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/timeout/TimeoutManager.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/timeout/TimeoutManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/timeout/TimeoutManager.java?rev=1670759&r1=1670758&r2=1670759&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/timeout/TimeoutManager.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/timeout/TimeoutManager.java Wed Apr  1 19:33:22 2015
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.uima.ducc.container.common.MessageBuffer;
@@ -52,9 +53,14 @@ public class TimeoutManager implements I
 	
 	private long ackTimeout= 60*1000;
 	
-	private ConcurrentHashMap<IFsm,ITimeoutTask> mapTask = new ConcurrentHashMap<IFsm,ITimeoutTask>();
-	private ConcurrentHashMap<IFsm,ScheduledFuture<?>> mapFuture = new ConcurrentHashMap<IFsm,ScheduledFuture<?>>();
+	private ConcurrentHashMap<IWorkItem,ITimeoutTask> mapTask = new ConcurrentHashMap<IWorkItem,ITimeoutTask>();
+	private ConcurrentHashMap<IWorkItem,ScheduledFuture<?>> mapFuture = new ConcurrentHashMap<IWorkItem,ScheduledFuture<?>>();
 
+	private TimeoutManager() {
+		ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) scheduledExecutorService;
+		scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
+	}
+	
 	@Override
 	public void pendingAck(IActionData actionData) {
 		String location = "pendingAck";
@@ -64,7 +70,7 @@ public class TimeoutManager implements I
 			IEvent event = WiFsm.Ack_Timer_Pop;
 			long deadline = System.currentTimeMillis()+ackTimeout;
 			ITimeoutTask timeoutTask = new TimeoutTask(fsm, event, actionData, deadline);
-			register(fsm, timeoutTask);
+			register(wi, timeoutTask);
 			MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
 			mb.append(Standardize.Label.deadline+"+"+ackTimeout/1000);
 			logger.debug(location, ILogger.null_id, mb.toString());
@@ -81,8 +87,7 @@ public class TimeoutManager implements I
 			MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
 			logger.debug(location, ILogger.null_id, mb.toString());
 			IWorkItem wi = actionData.getWorkItem();
-			IFsm fsm = wi.getFsm();
-			unregister(fsm);
+			unregister(wi);
 		}
 		catch(Exception e) {
 			logger.error(location, ILogger.null_id, e);
@@ -100,7 +105,7 @@ public class TimeoutManager implements I
 			long endTimeout = jd.getWorkItemTimeoutMillis();
 			long deadline = System.currentTimeMillis()+endTimeout;
 			ITimeoutTask timeoutTask = new TimeoutTask(fsm, event, actionData, deadline);
-			register(fsm, timeoutTask);
+			register(wi, timeoutTask);
 			MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
 			mb.append(Standardize.Label.deadline+"+"+endTimeout/1000);
 			logger.debug(location, ILogger.null_id, mb.toString());
@@ -117,8 +122,7 @@ public class TimeoutManager implements I
 			MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
 			logger.debug(location, ILogger.null_id, mb.toString());
 			IWorkItem wi = actionData.getWorkItem();
-			IFsm fsm = wi.getFsm();
-			unregister(fsm);
+			unregister(wi);
 		}
 		catch(Exception e) {
 			logger.error(location, ILogger.null_id, e);
@@ -132,23 +136,22 @@ public class TimeoutManager implements I
 			MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
 			logger.debug(location, ILogger.null_id, mb.toString());
 			IWorkItem wi = actionData.getWorkItem();
-			IFsm fsm = wi.getFsm();
-			unregister(fsm);
+			unregister(wi);
 		}
 		catch(Exception e) {
 			logger.error(location, ILogger.null_id, e);
 		}
 	}
 	
-	private void register(IFsm fsm, ITimeoutTask timeoutTask) {
+	private void register(IWorkItem wi, ITimeoutTask timeoutTask) {
 		String location = "register";
 		try {
-			mapTask.put(fsm, timeoutTask);
+			mapTask.put(wi, timeoutTask);
 			Callable<?> callable = timeoutTask;
 			long delay = timeoutTask.getDeadline() - System.currentTimeMillis();
 			TimeUnit timeUnit = TimeUnit.MILLISECONDS;
 			ScheduledFuture<?> scheduledFuture = scheduledExecutorService.schedule(callable, delay, timeUnit);
-			mapFuture.put(fsm, scheduledFuture);
+			mapFuture.put(wi, scheduledFuture);
 			//
 			IActionData actionData = timeoutTask.getActionData();
 			MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
@@ -161,14 +164,14 @@ public class TimeoutManager implements I
 		}
 	}
 	
-	private void unregister(IFsm fsm) {
+	private void unregister(IWorkItem wi) {
 		String location = "unregister";
 		try {
-			ScheduledFuture<?> scheduledFuture = mapFuture.remove(fsm);
+			ScheduledFuture<?> scheduledFuture = mapFuture.remove(wi);
 			if(scheduledFuture != null) {
 				scheduledFuture.cancel(false);
 			}
-			ITimeoutTask timeoutTask = mapTask.remove(fsm);
+			ITimeoutTask timeoutTask = mapTask.remove(wi);
 			if(timeoutTask != null) {
 				IActionData actionData = timeoutTask.getActionData();
 				MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
@@ -195,12 +198,11 @@ public class TimeoutManager implements I
 			MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
 			logger.debug(location, ILogger.null_id, mb.toString());
 			IWorkItem wi = actionData.getWorkItem();
-			IFsm fsm = wi.getFsm();
-			unregister(fsm);
+			unregister(wi);
 		}
 		catch(Exception e) {
 			logger.error(location, ILogger.null_id, e);
 		}
 	}
-
+	
 }