You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2012/10/16 10:37:09 UTC

svn commit: r1398696 - in /stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl: EnhancementJobHandler.java EventJobManagerImpl.java

Author: rwesten
Date: Tue Oct 16 08:37:08 2012
New Revision: 1398696

URL: http://svn.apache.org/viewvc?rev=1398696&view=rev
Log:
STANBOL-779: Adds a new EnhancementJobObserver class to the Event Job Manager that is used to track when the asyc. executed enhancement job finishes. The new class ensures that a completion of the enhancement process is correctly detected even if the completion was before the initial check.

Modified:
    stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
    stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java

Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java?rev=1398696&r1=1398695&r2=1398696&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java Tue Oct 16 08:37:08 2012
@@ -25,13 +25,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Dictionary;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -40,13 +38,14 @@ import org.apache.clerezza.rdf.core.NonL
 import org.apache.stanbol.enhancer.servicesapi.EngineException;
 import org.apache.stanbol.enhancer.servicesapi.EnhancementEngine;
 import org.apache.stanbol.enhancer.servicesapi.EnhancementEngineManager;
-import org.apache.stanbol.enhancer.servicesapi.helper.ExecutionPlanHelper;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ibm.icu.lang.UCharacter.SentenceBreak;
+
 public class EnhancementJobHandler implements EventHandler {
 
     private EnhancementEngineManager engineManager;
@@ -69,7 +68,7 @@ public class EnhancementJobHandler imple
      * contentItems and the values are the objects used to interrupt the 
      * requesting thread as soon as the enhancement process has finished. 
      */
-    private Map<EnhancementJob,Object> processingJobs;
+    private Map<EnhancementJob,EnhancementJobObserver> processingJobs;
     private final ReadWriteLock processingLock = new ReentrantReadWriteLock();
     private Thread observerDaemon;
     
@@ -85,11 +84,11 @@ public class EnhancementJobHandler imple
         this.engineManager = engineManager;
         processingLock.writeLock().lock();
         try {
-            processingJobs = new LinkedHashMap<EnhancementJob,Object>();
+            processingJobs = new LinkedHashMap<EnhancementJob,EnhancementJobObserver>();
         } finally{
             processingLock.writeLock().unlock();
         }
-        observerDaemon = new Thread(new EnhancementJobObserver());
+        observerDaemon = new Thread(new EnhancementJobObserverDaemon());
         observerDaemon.setName("Event Job Manager Observer Daemon");
         observerDaemon.setDaemon(true);
         observerDaemon.start();
@@ -134,19 +133,19 @@ public class EnhancementJobHandler imple
      * @return An object that will get {@link Object#notifyAll()} as soon as
      * {@link EnhancementJob#isFinished()} or this instance is deactivated
      */
-    public Object register(EnhancementJob enhancementJob){
+    public EnhancementJobObserver register(EnhancementJob enhancementJob){
         final boolean init;
-        Object o;
+        EnhancementJobObserver observer;
         processingLock.writeLock().lock();
         try {
             if(enhancementJob == null || processingJobs == null){
                 return null;
             }
-            o = processingJobs.get(enhancementJob);
-            if(o == null){
-                o = new Object();
-                logJobInfo(enhancementJob, "Add EnhancementJob:");
-                processingJobs.put(enhancementJob, o);
+            observer = processingJobs.get(enhancementJob);
+            if(observer == null){
+                observer = new EnhancementJobObserver(enhancementJob);
+                logJobInfo(log, enhancementJob, "Add EnhancementJob:",false);
+                processingJobs.put(enhancementJob, observer);
                 init = true;
             } else {
                 init = false;
@@ -155,6 +154,7 @@ public class EnhancementJobHandler imple
             processingLock.writeLock().unlock();
         }
         if(init){
+            observer.acquire();
             enhancementJob.startProcessing();
             log.debug("++ w: {}","init execution");
             enhancementJob.getLock().writeLock().lock();
@@ -166,7 +166,7 @@ public class EnhancementJobHandler imple
                 enhancementJob.getLock().writeLock().unlock();
             }
         }
-        return o;
+        return observer;
     }
 
     @Override
@@ -283,18 +283,21 @@ public class EnhancementJobHandler imple
      */
     private void finish(EnhancementJob job){
         processingLock.writeLock().lock();
-        Object o;
+        EnhancementJobObserver observer;
         try {
-            o = processingJobs.remove(job);
+            observer = processingJobs.remove(job);
         } finally {
             processingLock.writeLock().unlock();
         }
-        if(o != null) {
-            synchronized (o) {
-                logJobInfo(job, "Finished EnhancementJob:");
+        if(observer != null) {
+            try {
+                logJobInfo(log, job, "Finished EnhancementJob:",false);
                 log.debug("++ n: finished processing ContentItem {} with Chain {}",
                     job.getContentItem().getUri(),job.getChainName());
-                o.notifyAll();
+            } finally {
+                //release the semaphore to send signal to the EventJobManager waiting
+                //for the results
+                observer.release();
             }
         } else {
             log.warn("EnhancementJob for ContentItem {} is not " +
@@ -335,15 +338,16 @@ public class EnhancementJobHandler imple
      * Logs basic infos about the Job as INFO and detailed infos as DEBUG
      * @param job
      */
-    protected void logJobInfo(EnhancementJob job, String header) {
+    protected static void logJobInfo(Logger log, EnhancementJob job, String header, boolean logExecutions) {
         if(header != null){
             log.info(header);
         }
-        log.info("   state: {}",job.isFinished()?"finished":job.isFailed()?"failed":"processing");
-        log.info("   chain: {}",job.getChainName());
+        log.info("   finished:     {}",job.isFinished());
+        log.info("   state:        {}",job.isFailed()?"failed":"processing");
+        log.info("   chain:        {}",job.getChainName());
         log.info("   content-item: {}", job.getContentItem().getUri());
-        log.debug("   executions:");
-        if(log.isDebugEnabled()){
+        if(logExecutions){
+            log.info("  executions:");
             for(NonLiteral completedExec : job.getCompleted()){
                 log.info("    - {} completed",getEngine(job.getExecutionMetadata(), 
                     job.getExecutionNode(completedExec)));
@@ -354,16 +358,87 @@ public class EnhancementJobHandler imple
             }
         }
     }
+    public class EnhancementJobObserver{
+        
+        private static final int MIN_WAIT_TIME = 500;
+        private final EnhancementJob enhancementJob;
+        private final Semaphore semaphore;
+        
+        private EnhancementJobObserver(EnhancementJob job){
+            if(job == null){
+                throw new IllegalArgumentException("The parsed EnhancementJob MUST NOT be NULL!");
+            }
+            this.enhancementJob = job;
+            this.semaphore = new Semaphore(1);
+        }
+
+        protected void acquire() {
+            try {
+                semaphore.acquire();
+            } catch (InterruptedException e) {
+                log.warn("Interrupted while acquireing Semaphore for EnhancementJob "
+                        + enhancementJob + "!",e);
+            }
+        }
+        
+        protected void release() {
+            semaphore.release();
+        }
+
+        public boolean hasCompleted() {
+            enhancementJob.getLock().readLock().lock();
+            try {
+                return enhancementJob.isFinished();
+            } finally {
+                enhancementJob.getLock().readLock().unlock();
+            }
+        }
+
+        public void waitForCompletion(int maxEnhancementJobWaitTime) {
+            if(semaphore.availablePermits() < 1){
+                // The only permit is taken by the EnhancementJobHander
+                try {
+                    semaphore.tryAcquire(1,
+                        Math.max(MIN_WAIT_TIME, maxEnhancementJobWaitTime),TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    //interupted
+                }
+            } else if(!hasCompleted()){
+                int wait = Math.max(100, maxEnhancementJobWaitTime/10);
+                log.warn("Unexpected permit available for Semaphore of "
+                    + "EnhancementJob of ContentItem {}. Fallback to wait({})"
+                    + "for detecting if Job has finished. While the fallback "
+                    + "should ensure correct Enhancement results this indicates a "
+                    + "Bug in the EventHobManager. Please feel free to report "
+                    + "This on dev@stanbol.apache.org or the Apache Stanbol "
+                    + "Issue Tracker.",enhancementJob.getContentItem().getUri(),wait);
+                try {
+                    Thread.currentThread().wait(wait);
+                } catch (InterruptedException e) {
+                    //interupted
+                }
+            }// else completed
+        }
+        
+    }
+    
+    
     /**
      * Currently only used to debug the number of currently registered
      * Enhancements Jobs (if there are some)
      * @author Rupert Westenthaler
      */
-    private class EnhancementJobObserver implements Runnable {
+    private class EnhancementJobObserverDaemon implements Runnable {
 
+        /**
+         * The logger of the Observer. Can be used to configure Loglevel specificly
+         * 
+         */
+        private Logger observerLog = LoggerFactory.getLogger(EnhancementJobObserverDaemon.class);
+        
         @Override
         public void run() {
-            log.debug(" ... init EnhancementJobObserver");
+            observerLog.debug(" ... init EnhancementJobObserver");
             while(processingJobs != null){
                 try {
                     Thread.sleep(10000);
@@ -382,13 +457,13 @@ public class EnhancementJobHandler imple
                     readLock.unlock();
                 }
                 if(!jobs.isEmpty()){
-                    log.info(" -- {} active Enhancement Jobs",jobs.size());
-                    if(log.isDebugEnabled()){
+                    observerLog.info(" -- {} active Enhancement Jobs",jobs.size());
+                    if(observerLog.isDebugEnabled()){
                         for(EnhancementJob job : jobs){
                             Lock jobLock = job.getLock().readLock();
                             jobLock.lock();
                             try {
-                                logJobInfo(job,null);
+                                logJobInfo(observerLog,job,null,true);
                             } finally {
                                 jobLock.unlock();
                             }

Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java?rev=1398696&r1=1398695&r2=1398696&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java Tue Oct 16 08:37:08 2012
@@ -21,6 +21,7 @@ import static org.apache.stanbol.enhance
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.List;
+import java.util.concurrent.Semaphore;
 
 import org.apache.clerezza.rdf.core.Graph;
 import org.apache.felix.scr.annotations.Activate;
@@ -30,6 +31,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJobHandler.EnhancementJobObserver;
 import org.apache.stanbol.enhancer.servicesapi.Chain;
 import org.apache.stanbol.enhancer.servicesapi.ChainException;
 import org.apache.stanbol.enhancer.servicesapi.ChainManager;
@@ -58,6 +60,8 @@ public class EventJobManagerImpl impleme
     private final Logger log = LoggerFactory.getLogger(EventJobManagerImpl.class);
     
     public static final int DEFAULT_SERVICE_RANKING = 0;
+
+    private static final int MAX_ENHANCEMENT_JOB_WAIT_TIME = 10*1000;
     
     @Reference
     protected ChainManager chainManager;
@@ -127,16 +131,10 @@ public class EventJobManagerImpl impleme
         EnhancementJob job = new EnhancementJob(ci, chain.getName(), chain.getExecutionPlan(),isDefaultChain);
         //start the execution
         //wait for the results
-        Object object = jobHandler.register(job);
-        while(!job.isFinished() & jobHandler != null){
-            synchronized (object) {
-                try {
-                    object.wait();
-                } catch (InterruptedException e) {
-                    log.debug("Interupped for EnhancementJob if ContentItem {}",
-                        job.getContentItem().getUri());
-                }
-            }
+        EnhancementJobObserver observer = jobHandler.register(job);
+        //TODO: allow configuring a max completion time (e.g. 1min)
+        while(!observer.hasCompleted() & jobHandler != null){
+            observer.waitForCompletion(MAX_ENHANCEMENT_JOB_WAIT_TIME);
         }
         log.info("{} EnhancementJob for ContentItem {} after {}ms",
             new Object[]{ job.isFailed() ? "Failed" : "Finished",