You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2012/10/16 10:37:09 UTC
svn commit: r1398696 - in
/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl:
EnhancementJobHandler.java EventJobManagerImpl.java
Author: rwesten
Date: Tue Oct 16 08:37:08 2012
New Revision: 1398696
URL: http://svn.apache.org/viewvc?rev=1398696&view=rev
Log:
STANBOL-779: Adds a new EnhancementJobObserver class to the Event Job Manager that is used to track when the asyc. executed enhancement job finishes. The new class ensures that a completion of the enhancement process is correctly detected even if the completion was before the initial check.
Modified:
stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java
Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java?rev=1398696&r1=1398695&r2=1398696&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java Tue Oct 16 08:37:08 2012
@@ -25,13 +25,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -40,13 +38,14 @@ import org.apache.clerezza.rdf.core.NonL
import org.apache.stanbol.enhancer.servicesapi.EngineException;
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngine;
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngineManager;
-import org.apache.stanbol.enhancer.servicesapi.helper.ExecutionPlanHelper;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ibm.icu.lang.UCharacter.SentenceBreak;
+
public class EnhancementJobHandler implements EventHandler {
private EnhancementEngineManager engineManager;
@@ -69,7 +68,7 @@ public class EnhancementJobHandler imple
* contentItems and the values are the objects used to interrupt the
* requesting thread as soon as the enhancement process has finished.
*/
- private Map<EnhancementJob,Object> processingJobs;
+ private Map<EnhancementJob,EnhancementJobObserver> processingJobs;
private final ReadWriteLock processingLock = new ReentrantReadWriteLock();
private Thread observerDaemon;
@@ -85,11 +84,11 @@ public class EnhancementJobHandler imple
this.engineManager = engineManager;
processingLock.writeLock().lock();
try {
- processingJobs = new LinkedHashMap<EnhancementJob,Object>();
+ processingJobs = new LinkedHashMap<EnhancementJob,EnhancementJobObserver>();
} finally{
processingLock.writeLock().unlock();
}
- observerDaemon = new Thread(new EnhancementJobObserver());
+ observerDaemon = new Thread(new EnhancementJobObserverDaemon());
observerDaemon.setName("Event Job Manager Observer Daemon");
observerDaemon.setDaemon(true);
observerDaemon.start();
@@ -134,19 +133,19 @@ public class EnhancementJobHandler imple
* @return An object that will get {@link Object#notifyAll()} as soon as
* {@link EnhancementJob#isFinished()} or this instance is deactivated
*/
- public Object register(EnhancementJob enhancementJob){
+ public EnhancementJobObserver register(EnhancementJob enhancementJob){
final boolean init;
- Object o;
+ EnhancementJobObserver observer;
processingLock.writeLock().lock();
try {
if(enhancementJob == null || processingJobs == null){
return null;
}
- o = processingJobs.get(enhancementJob);
- if(o == null){
- o = new Object();
- logJobInfo(enhancementJob, "Add EnhancementJob:");
- processingJobs.put(enhancementJob, o);
+ observer = processingJobs.get(enhancementJob);
+ if(observer == null){
+ observer = new EnhancementJobObserver(enhancementJob);
+ logJobInfo(log, enhancementJob, "Add EnhancementJob:",false);
+ processingJobs.put(enhancementJob, observer);
init = true;
} else {
init = false;
@@ -155,6 +154,7 @@ public class EnhancementJobHandler imple
processingLock.writeLock().unlock();
}
if(init){
+ observer.acquire();
enhancementJob.startProcessing();
log.debug("++ w: {}","init execution");
enhancementJob.getLock().writeLock().lock();
@@ -166,7 +166,7 @@ public class EnhancementJobHandler imple
enhancementJob.getLock().writeLock().unlock();
}
}
- return o;
+ return observer;
}
@Override
@@ -283,18 +283,21 @@ public class EnhancementJobHandler imple
*/
private void finish(EnhancementJob job){
processingLock.writeLock().lock();
- Object o;
+ EnhancementJobObserver observer;
try {
- o = processingJobs.remove(job);
+ observer = processingJobs.remove(job);
} finally {
processingLock.writeLock().unlock();
}
- if(o != null) {
- synchronized (o) {
- logJobInfo(job, "Finished EnhancementJob:");
+ if(observer != null) {
+ try {
+ logJobInfo(log, job, "Finished EnhancementJob:",false);
log.debug("++ n: finished processing ContentItem {} with Chain {}",
job.getContentItem().getUri(),job.getChainName());
- o.notifyAll();
+ } finally {
+ //release the semaphore to send signal to the EventJobManager waiting
+ //for the results
+ observer.release();
}
} else {
log.warn("EnhancementJob for ContentItem {} is not " +
@@ -335,15 +338,16 @@ public class EnhancementJobHandler imple
* Logs basic infos about the Job as INFO and detailed infos as DEBUG
* @param job
*/
- protected void logJobInfo(EnhancementJob job, String header) {
+ protected static void logJobInfo(Logger log, EnhancementJob job, String header, boolean logExecutions) {
if(header != null){
log.info(header);
}
- log.info(" state: {}",job.isFinished()?"finished":job.isFailed()?"failed":"processing");
- log.info(" chain: {}",job.getChainName());
+ log.info(" finished: {}",job.isFinished());
+ log.info(" state: {}",job.isFailed()?"failed":"processing");
+ log.info(" chain: {}",job.getChainName());
log.info(" content-item: {}", job.getContentItem().getUri());
- log.debug(" executions:");
- if(log.isDebugEnabled()){
+ if(logExecutions){
+ log.info(" executions:");
for(NonLiteral completedExec : job.getCompleted()){
log.info(" - {} completed",getEngine(job.getExecutionMetadata(),
job.getExecutionNode(completedExec)));
@@ -354,16 +358,87 @@ public class EnhancementJobHandler imple
}
}
}
+ public class EnhancementJobObserver{
+
+ private static final int MIN_WAIT_TIME = 500;
+ private final EnhancementJob enhancementJob;
+ private final Semaphore semaphore;
+
+ private EnhancementJobObserver(EnhancementJob job){
+ if(job == null){
+ throw new IllegalArgumentException("The parsed EnhancementJob MUST NOT be NULL!");
+ }
+ this.enhancementJob = job;
+ this.semaphore = new Semaphore(1);
+ }
+
+ protected void acquire() {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while acquireing Semaphore for EnhancementJob "
+ + enhancementJob + "!",e);
+ }
+ }
+
+ protected void release() {
+ semaphore.release();
+ }
+
+ public boolean hasCompleted() {
+ enhancementJob.getLock().readLock().lock();
+ try {
+ return enhancementJob.isFinished();
+ } finally {
+ enhancementJob.getLock().readLock().unlock();
+ }
+ }
+
+ public void waitForCompletion(int maxEnhancementJobWaitTime) {
+ if(semaphore.availablePermits() < 1){
+ // The only permit is taken by the EnhancementJobHander
+ try {
+ semaphore.tryAcquire(1,
+ Math.max(MIN_WAIT_TIME, maxEnhancementJobWaitTime),TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ //interupted
+ }
+ } else if(!hasCompleted()){
+ int wait = Math.max(100, maxEnhancementJobWaitTime/10);
+ log.warn("Unexpected permit available for Semaphore of "
+ + "EnhancementJob of ContentItem {}. Fallback to wait({})"
+ + "for detecting if Job has finished. While the fallback "
+ + "should ensure correct Enhancement results this indicates a "
+ + "Bug in the EventHobManager. Please feel free to report "
+ + "This on dev@stanbol.apache.org or the Apache Stanbol "
+ + "Issue Tracker.",enhancementJob.getContentItem().getUri(),wait);
+ try {
+ Thread.currentThread().wait(wait);
+ } catch (InterruptedException e) {
+ //interupted
+ }
+ }// else completed
+ }
+
+ }
+
+
/**
* Currently only used to debug the number of currently registered
* Enhancements Jobs (if there are some)
* @author Rupert Westenthaler
*/
- private class EnhancementJobObserver implements Runnable {
+ private class EnhancementJobObserverDaemon implements Runnable {
+ /**
+ * The logger of the Observer. Can be used to configure Loglevel specificly
+ *
+ */
+ private Logger observerLog = LoggerFactory.getLogger(EnhancementJobObserverDaemon.class);
+
@Override
public void run() {
- log.debug(" ... init EnhancementJobObserver");
+ observerLog.debug(" ... init EnhancementJobObserver");
while(processingJobs != null){
try {
Thread.sleep(10000);
@@ -382,13 +457,13 @@ public class EnhancementJobHandler imple
readLock.unlock();
}
if(!jobs.isEmpty()){
- log.info(" -- {} active Enhancement Jobs",jobs.size());
- if(log.isDebugEnabled()){
+ observerLog.info(" -- {} active Enhancement Jobs",jobs.size());
+ if(observerLog.isDebugEnabled()){
for(EnhancementJob job : jobs){
Lock jobLock = job.getLock().readLock();
jobLock.lock();
try {
- logJobInfo(job,null);
+ logJobInfo(observerLog,job,null,true);
} finally {
jobLock.unlock();
}
Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java?rev=1398696&r1=1398695&r2=1398696&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java Tue Oct 16 08:37:08 2012
@@ -21,6 +21,7 @@ import static org.apache.stanbol.enhance
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
+import java.util.concurrent.Semaphore;
import org.apache.clerezza.rdf.core.Graph;
import org.apache.felix.scr.annotations.Activate;
@@ -30,6 +31,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJobHandler.EnhancementJobObserver;
import org.apache.stanbol.enhancer.servicesapi.Chain;
import org.apache.stanbol.enhancer.servicesapi.ChainException;
import org.apache.stanbol.enhancer.servicesapi.ChainManager;
@@ -58,6 +60,8 @@ public class EventJobManagerImpl impleme
private final Logger log = LoggerFactory.getLogger(EventJobManagerImpl.class);
public static final int DEFAULT_SERVICE_RANKING = 0;
+
+ private static final int MAX_ENHANCEMENT_JOB_WAIT_TIME = 10*1000;
@Reference
protected ChainManager chainManager;
@@ -127,16 +131,10 @@ public class EventJobManagerImpl impleme
EnhancementJob job = new EnhancementJob(ci, chain.getName(), chain.getExecutionPlan(),isDefaultChain);
//start the execution
//wait for the results
- Object object = jobHandler.register(job);
- while(!job.isFinished() & jobHandler != null){
- synchronized (object) {
- try {
- object.wait();
- } catch (InterruptedException e) {
- log.debug("Interupped for EnhancementJob if ContentItem {}",
- job.getContentItem().getUri());
- }
- }
+ EnhancementJobObserver observer = jobHandler.register(job);
+ //TODO: allow configuring a max completion time (e.g. 1min)
+ while(!observer.hasCompleted() & jobHandler != null){
+ observer.waitForCompletion(MAX_ENHANCEMENT_JOB_WAIT_TIME);
}
log.info("{} EnhancementJob for ContentItem {} after {}ms",
new Object[]{ job.isFailed() ? "Failed" : "Finished",