You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2012/12/09 07:27:15 UTC

svn commit: r1418821 - in /stanbol/trunk/enhancer: defaults/src/main/resources/config/ jobmanager/event/ jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/

Author: rwesten
Date: Sun Dec  9 06:27:13 2012
New Revision: 1418821

URL: http://svn.apache.org/viewvc?rev=1418821&view=rev
Log:
fix for STANBOL-830 and impplementation of STANBOL-831: The EventJobManager is no capable to concurrently process multiple ContentItems with the same URI

Modified:
    stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg
    stanbol/trunk/enhancer/jobmanager/event/pom.xml
    stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
    stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
    stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java

Modified: stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg (original)
+++ stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg Sun Dec  9 06:27:13 2012
@@ -22,4 +22,6 @@
 # might take a lot of time. For such cases the IgnoreTimeout property can be
 # used to exclude specific EventHandlers rather than deactivating this feature
 # all together.
-org.apache.felix.eventadmin.IgnoreTimeout=org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJobHandler
\ No newline at end of file
+org.apache.felix.eventadmin.IgnoreTimeout=org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJobHandler
+org.apache.felix.eventadmin.ThreadPoolSize=20
+org.apache.felix.eventadmin.CacheSize=1024
\ No newline at end of file

Modified: stanbol/trunk/enhancer/jobmanager/event/pom.xml
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/pom.xml?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/pom.xml (original)
+++ stanbol/trunk/enhancer/jobmanager/event/pom.xml Sun Dec  9 06:27:13 2012
@@ -84,7 +84,7 @@
     <dependency>
       <groupId>org.apache.stanbol</groupId>
       <artifactId>org.apache.stanbol.enhancer.servicesapi</artifactId>
-      <version>0.9.0-incubating</version>
+      <version>0.10.0-SNAPSHOT</version>
     </dependency>
 
     <dependency>

Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java Sun Dec  9 06:27:13 2012
@@ -330,13 +330,13 @@ public class EnhancementJob {
      * @return the currently running executions.
      */
     public Set<NonLiteral> getRunning() {
-        log.debug("++ r: {}","getRunning");
+        log.trace("++ r: {}","getRunning");
         readLock.lock();
         try {
-            log.debug(">> r: {}","getRunning");
+            log.trace(">> r: {}","getRunning");
             return runningExec;
         } finally {
-            log.debug("<< r: {}","getRunning");
+            log.trace("<< r: {}","getRunning");
             readLock.unlock();
         }
     }
@@ -347,13 +347,13 @@ public class EnhancementJob {
      * @return the completed execution nodes
      */
     public Set<NonLiteral> getCompleted() {
-        log.debug("++ r: {}","getCompleted");
+        log.trace("++ r: {}","getCompleted");
         readLock.lock();
         try {
-            log.debug(">> r: {}","getCompleted");
+            log.trace(">> r: {}","getCompleted");
             return completedExec;
         } finally {
-            log.debug("<< r: {}","getCompleted");
+            log.trace("<< r: {}","getCompleted");
             readLock.unlock();
         }
     }
@@ -377,13 +377,13 @@ public class EnhancementJob {
         }
         writeLock.lock();
         NonLiteral executionNode = getExecutionNode(execution);
-        log.debug("++ w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
+        log.trace("++ w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
         try {
-            log.debug(">> w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
+            log.trace(">> w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
             setNodeCompleted(executionNode);
             setExecutionCompleted(executionMetadata, execution, null);
         } finally {
-            log.debug("<< w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
+            log.trace("<< w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
             writeLock.unlock();
         }
     }
@@ -420,7 +420,7 @@ public class EnhancementJob {
                     + " | chain.running " + running + ")!");
         }
         if (running.remove(executionNode)) {
-            log.debug(
+            log.trace(
                 "Execution of '{}' for ContentItem {} completed "
                 + "(chain: {}, node: {}, optional {})",
                 new Object[] {engine, contentItem.getUri().getUnicodeString(), 
@@ -455,10 +455,10 @@ public class EnhancementJob {
         String engine = getEngine(executionPlan, executionNode);
         boolean optional = isOptional(executionPlan, executionNode);
         Set<NonLiteral> dependsOn = getDependend(executionPlan, executionNode);
-        log.debug("++ w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+        log.trace("++ w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
         writeLock.lock();
         try {
-            log.debug(">> w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+            log.trace(">> w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
             if (completed.contains(executionNode)) {
                 String message = "Unable to set state of ExectionNode '" + executionNode + "'(chain '"
                                  + chain + "' | contentItem '" + contentItem.getUri()
@@ -490,7 +490,7 @@ public class EnhancementJob {
                                        chain, executionNode, optional});
                 return;
             } else { //added an engine to running
-                log.debug("Started Execution of '{}' for ContentItem {} "
+                log.trace("Started Execution of '{}' for ContentItem {} "
                          + "(chain: {}, node: {}, optional {})",
                     new Object[] {engine, contentItem.getUri().getUnicodeString(), chain,
                                   executionNode, optional});
@@ -502,7 +502,7 @@ public class EnhancementJob {
                 checkExecutable();
             }
         } finally {
-            log.debug("<< w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+            log.trace("<< w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
             writeLock.unlock();
         }
     }
@@ -542,7 +542,7 @@ public class EnhancementJob {
                 for(NonLiteral node : executeableNodes){
                     engines.add(getEngine(executionPlan, node));
                 }
-                log.debug("MARK {} as executeable",engines);
+                log.trace("MARK {} as executeable",engines);
             }
             //we need to get the em:Executables for the ep:ExecutionNodes ...
             if(executeableNodes.isEmpty()){
@@ -571,13 +571,13 @@ public class EnhancementJob {
      * currently running engines.
      */
     public Set<NonLiteral> getExecutable(){
-        log.debug("++ r: {}","getExecutable");
+        log.trace("++ r: {}","getExecutable");
         readLock.lock();
-        log.debug(">> r: {}","getExecutable");
+        log.trace(">> r: {}","getExecutable");
         try {
             return executable;
         } finally {
-            log.debug("<< r: {}:{}","getExecutable",executable);
+            log.trace("<< r: {}:{}","getExecutable",executable);
             readLock.unlock();  
         }
     }
@@ -586,14 +586,14 @@ public class EnhancementJob {
      * @return if this enhancement job is finished.
      */
     public boolean isFinished(){
-        log.debug("++ r: {}","isFinished");
+        log.trace("++ r: {}","isFinished");
         readLock.lock();
         try {
-            log.debug(">> r: {}","isFinished");
+            log.trace(">> r: {}","isFinished");
             return running.isEmpty() && // wait for running engine (regard if failed or not)
                     (executable.isEmpty() || isFailed()); //no more engines or already failed
         } finally {
-            log.debug("<< r: {}","isFinished");
+            log.trace("<< r: {}","isFinished");
             readLock.unlock();
         }
     }
@@ -605,10 +605,10 @@ public class EnhancementJob {
         NonLiteral executionNode = getExecutionNode(execution);
         final boolean optional = isOptional(executionPlan, executionNode);
         final String engineName = getEngine(executionPlan, executionNode);
-        log.debug("++ w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+        log.trace("++ w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
         writeLock.lock();
         try {
-            log.debug(">> w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+            log.trace(">> w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
             StringBuilder message = new StringBuilder();
             message.append(String.format("Unable to process ContentItem '%s' with " +
             		"Enhancement Engine '%s' because the engine ", 
@@ -637,7 +637,7 @@ public class EnhancementJob {
                 //re-throwing by the EnhancementJobManager.
             }
         } finally {
-            log.debug("<< w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+            log.trace("<< w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
             writeLock.unlock();
         }
 
@@ -648,26 +648,29 @@ public class EnhancementJob {
      * @return if the EnhancementJob has failed or not.
      */
     public boolean isFailed() {
-        log.debug("++ r: {}","isFailed");
+        log.trace("++ r: {}","isFailed");
         readLock.lock();
         try {
-            log.debug(">> r: {}","isFailed");
+            log.trace(">> r: {}","isFailed");
             return isExecutionFailed(executionMetadata, chainExecutionNode);
         } finally {
-            log.debug("<< r: {}","isFailed");
+            log.trace("<< r: {}","isFailed");
             readLock.unlock();
         }
     }
-    
-    @Override
-    public int hashCode() {
-        return contentItem.getUri().hashCode();
-    }
-    @Override
-    public boolean equals(Object o) {
-        return o instanceof EnhancementJob && 
-                contentItem.getUri().equals(((EnhancementJob)o).contentItem.getUri());
-    }
+// NOTE: use default implementations of hashCode and equals for now as we need
+//       to support the concurrent enhancement of ContentItems with the same
+//       URI. Also two ContentItems with the same URI might still have other
+//       content (as users can manually parse the URI in the request). 
+//    @Override
+//    public int hashCode() {
+//        return contentItem.getUri().hashCode();
+//    }
+//    @Override
+//    public boolean equals(Object o) {
+//        return o instanceof EnhancementJob && 
+//                contentItem.getUri().equals(((EnhancementJob)o).contentItem.getUri());
+//    }
     @Override
     public String toString() {
         return "EnhancementJob for ContentItem "+contentItem.getUri();

Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java Sun Dec  9 06:27:13 2012
@@ -44,8 +44,6 @@ import org.osgi.service.event.EventHandl
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ibm.icu.lang.UCharacter.SentenceBreak;
-
 public class EnhancementJobHandler implements EventHandler {
 
     private EnhancementEngineManager engineManager;
@@ -144,10 +142,14 @@ public class EnhancementJobHandler imple
             observer = processingJobs.get(enhancementJob);
             if(observer == null){
                 observer = new EnhancementJobObserver(enhancementJob);
-                logJobInfo(log, enhancementJob, "Add EnhancementJob:",false);
+                if(log.isDebugEnabled()){
+                    logJobInfo(log, enhancementJob, "Add EnhancementJob:",log.isTraceEnabled());
+                }
                 processingJobs.put(enhancementJob, observer);
                 init = true;
             } else {
+                log.warn("Request to register an EnhancementJob for an ContentItem {} that is" +
+                		"already registered "+enhancementJob.getContentItem().getUri());
                 init = false;
             }
         } finally {
@@ -156,13 +158,19 @@ public class EnhancementJobHandler imple
         if(init){
             observer.acquire();
             enhancementJob.startProcessing();
-            log.debug("++ w: {}","init execution");
+            log.trace("++ w: {}","init execution");
             enhancementJob.getLock().writeLock().lock();
             try {
-                log.debug(">> w: {}","init execution");
-                executeNextNodes(enhancementJob);
+                log.trace(">> w: {}","init execution");
+                if(!executeNextNodes(enhancementJob)){
+                    String message = "Unable to start Execution of "+enhancementJob.getContentItem().getUri();
+                    log.warn(message);
+                    logJobInfo(log, enhancementJob, null, true);
+                    log.warn("finishing job ...");
+                    finish(enhancementJob);
+                }
             } finally {
-                log.debug("<< w: {}","init execution");
+                log.trace("<< w: {}","init execution");
                 enhancementJob.getLock().writeLock().unlock();
             }
         }
@@ -188,9 +196,9 @@ public class EnhancementJobHandler imple
            log.error(message,t);
         }
         //(2) trigger the next actions
-        log.debug("++ w: {}","check for next Executions");
+        log.trace("++ w: {}","check for next Executions");
         job.getLock().writeLock().lock();
-        log.debug(">> w: {}","check for next Executions");
+        log.trace(">> w: {}","check for next Executions");
         try {
             if(job.isFinished()){
                 finish(job);
@@ -213,7 +221,7 @@ public class EnhancementJobHandler imple
                 }
             }
         } finally {
-            log.debug("<< w: {}","check for next Executions");
+            log.trace("<< w: {}","check for next Executions");
             job.getLock().writeLock().unlock();
         }
     }
@@ -222,8 +230,8 @@ public class EnhancementJobHandler imple
      * @param execution
      */
     private void processEvent(EnhancementJob job, NonLiteral execution) {
-        NonLiteral executionNode = job.getExecutionNode(execution);
-        String engineName = getEngine(job.getExecutionPlan(), executionNode);
+        String engineName = getEngine(job.getExecutionPlan(), 
+            job.getExecutionNode(execution));
         //(1) execute the parsed ExecutionNode
         EnhancementEngine engine = engineManager.getEngine(engineName);
         if(engine != null){
@@ -241,27 +249,33 @@ public class EnhancementJobHandler imple
             }
             if(engineState == EnhancementEngine.ENHANCE_SYNCHRONOUS){
                 //ensure that this engine exclusively access the content item
-                log.debug("++ w: {}: {}","start sync execution", engine.getName());
+                log.trace("++ w: {}: {}","start sync execution", engine.getName());
                 job.getLock().writeLock().lock();
-                log.debug(">> w: {}: {}","start sync execution", engine.getName());
+                log.trace(">> w: {}: {}","start sync execution", engine.getName());
                 try {
                     engine.computeEnhancements(job.getContentItem());
                     job.setCompleted(execution);
                 } catch (EngineException e){
+                    log.warn(e.getMessage(),e);
+                    job.setFailed(execution, engine, e);
+                } catch (RuntimeException e){
+                    log.warn(e.getMessage(),e);
                     job.setFailed(execution, engine, e);
                 } finally{
-                    log.debug("<< w: {}: {}","finished sync execution", engine.getName());
+                    log.trace("<< w: {}: {}","finished sync execution", engine.getName());
                     job.getLock().writeLock().unlock();
                 }
             } else if(engineState == EnhancementEngine.ENHANCE_ASYNC){
                 try {
-                    log.debug("++ n: start async execution of Engine {}",engine.getName());
+                    log.trace("++ n: start async execution of Engine {}",engine.getName());
                     engine.computeEnhancements(job.getContentItem());
-                    log.debug("++ n: finished async execution of Engine {}",engine.getName());
+                    log.trace("++ n: finished async execution of Engine {}",engine.getName());
                     job.setCompleted(execution);
                 } catch (EngineException e) {
+                    log.warn(e.getMessage(),e);
                     job.setFailed(execution, engine, e);
                 } catch (RuntimeException e) {
+                    log.warn(e.getMessage(),e);
                     job.setFailed(execution, engine, e);
                 }
             } else { //CANNOT_ENHANCE
@@ -291,8 +305,10 @@ public class EnhancementJobHandler imple
         }
         if(observer != null) {
             try {
-                logJobInfo(log, job, "Finished EnhancementJob:",false);
-                log.debug("++ n: finished processing ContentItem {} with Chain {}",
+                if(log.isDebugEnabled()){
+                    logJobInfo(log, job, "Finished EnhancementJob:",log.isTraceEnabled());
+                }
+                log.trace("++ n: finished processing ContentItem {} with Chain {}",
                     job.getContentItem().getUri(),job.getChainName());
             } finally {
                 //release the semaphore to send signal to the EventJobManager waiting
@@ -316,16 +332,16 @@ public class EnhancementJobHandler imple
         //getExecutable returns an snapshot so we do not need to lock
         boolean startedExecution = false;
         for(NonLiteral executable : job.getExecutable()){
-            if(log.isDebugEnabled()){
-                log.debug("PREPARE execution of Engine {}",
+            if(log.isTraceEnabled()){
+                log.trace("PREPARE execution of Engine {}",
                     getEngine(job.getExecutionPlan(), job.getExecutionNode(executable)));
             }
             Dictionary<String,Object> properties = new Hashtable<String,Object>();
             properties.put(PROPERTY_JOB_MANAGER, job);
             properties.put(PROPERTY_EXECUTION, executable);
             job.setRunning(executable);
-            if(log.isDebugEnabled()){
-                log.debug("SHEDULE execution of Engine {}",
+            if(log.isTraceEnabled()){
+                log.trace("SHEDULE execution of Engine {}",
                     getEngine(job.getExecutionPlan(), job.getExecutionNode(executable)));
             }
             eventAdmin.postEvent(new Event(TOPIC_JOB_MANAGER,properties));
@@ -356,6 +372,16 @@ public class EnhancementJobHandler imple
                 log.info("    - {} running",getEngine(job.getExecutionMetadata(), 
                     job.getExecutionNode(runningExec)));
             }
+            for(NonLiteral executeable : job.getExecutable()){
+                log.info("    - {} executeable",getEngine(job.getExecutionMetadata(), 
+                    job.getExecutionNode(executeable)));
+            }
+        }
+        if(job.getErrorMessage() != null){
+            log.info("Error Message: {}",job.getErrorMessage());
+        }
+        if(job.getError() != null){
+            log.info("Reported Exception:",job.getError());
         }
     }
     public class EnhancementJobObserver{
@@ -394,14 +420,16 @@ public class EnhancementJobHandler imple
             }
         }
 
-        public void waitForCompletion(int maxEnhancementJobWaitTime) {
+        public boolean waitForCompletion(int maxEnhancementJobWaitTime) {
+            boolean acquire = false;
             if(semaphore.availablePermits() < 1){
                 // The only permit is taken by the EnhancementJobHander
                 try {
-                    semaphore.tryAcquire(1,
+                    acquire = semaphore.tryAcquire(1,
                         Math.max(MIN_WAIT_TIME, maxEnhancementJobWaitTime),TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     //interupted
+                    acquire = false;
                 }
             } else if(!hasCompleted()){
                 int wait = Math.max(100, maxEnhancementJobWaitTime/10);
@@ -417,9 +445,10 @@ public class EnhancementJobHandler imple
                 } catch (InterruptedException e) {
                     //interupted
                 }
+                acquire = true;
             }// else completed
+            return acquire;
         }
-        
     }
     
     
@@ -457,7 +486,7 @@ public class EnhancementJobHandler imple
                     readLock.unlock();
                 }
                 if(!jobs.isEmpty()){
-                    observerLog.info(" -- {} active Enhancement Jobs",jobs.size());
+                    observerLog.debug(" -- {} active Enhancement Jobs",jobs.size());
                     if(observerLog.isDebugEnabled()){
                         for(EnhancementJob job : jobs){
                             Lock jobLock = job.getLock().readLock();

Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java Sun Dec  9 06:27:13 2012
@@ -20,10 +20,12 @@ import static org.apache.stanbol.enhance
 
 import java.util.Dictionary;
 import java.util.Hashtable;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Semaphore;
+import java.util.Map.Entry;
 
 import org.apache.clerezza.rdf.core.Graph;
+import org.apache.clerezza.rdf.core.Triple;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,6 +43,8 @@ import org.apache.stanbol.enhancer.servi
 import org.apache.stanbol.enhancer.servicesapi.EnhancementEngineManager;
 import org.apache.stanbol.enhancer.servicesapi.EnhancementJobManager;
 import org.apache.stanbol.enhancer.servicesapi.helper.ExecutionPlanHelper;
+import org.apache.stanbol.enhancer.servicesapi.helper.execution.Execution;
+import org.apache.stanbol.enhancer.servicesapi.helper.execution.ExecutionMetadata;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.ComponentContext;
@@ -64,7 +68,10 @@ public class EventJobManagerImpl impleme
 
     public static final String MAX_ENHANCEMENT_JOB_WAIT_TIME = "stanbol.maxEnhancementJobWaitTime";
 
-    public static final int DEFAULT_MAX_ENHANCEMENT_JOB_WAIT_TIME = 10 * 1000;
+    /**
+     * default max wait time is 60sec (similar to the http timeout)
+     */
+    public static final int DEFAULT_MAX_ENHANCEMENT_JOB_WAIT_TIME = 60 * 1000;
     
     @Reference
     protected ChainManager chainManager;
@@ -141,14 +148,22 @@ public class EventJobManagerImpl impleme
         //start the execution
         //wait for the results
         EnhancementJobObserver observer = jobHandler.register(job);
-        //TODO: allow configuring a max completion time (e.g. 1min)
-        while(!observer.hasCompleted() & jobHandler != null){
-            observer.waitForCompletion(maxEnhancementJobWaitTime);
-        }
-        log.info("{} EnhancementJob for ContentItem {} after {}ms",
-            new Object[]{ job.isFailed() ? "Failed" : "Finished",
-                    job.getContentItem().getUri(),
-                    System.currentTimeMillis()-start});
+        //now wait for the execution to finish for the configured maximum time
+        boolean completed = observer.waitForCompletion(maxEnhancementJobWaitTime);
+        if(!completed){ //throw timeout exception
+            StringBuilder sb = new StringBuilder("Status:\n");
+            ExecutionMetadata em = ExecutionMetadata.parseFrom(job.getExecutionMetadata(), ci.getUri());
+            for(Entry<String,Execution> ex : em.getEngineExecutions().entrySet()){
+                sb.append("  -").append(ex.getKey()).append(": ").append(ex.getValue().getStatus()).append('\n');
+            }
+            throw new ChainException("Execution timeout after {}sec "+sb.toString()
+                    +" \n To change the timeout change value of property '"+
+                    MAX_ENHANCEMENT_JOB_WAIT_TIME+"' for the service "+getClass());
+        }
+        log.info("Execution of Chain {} {} after {}ms for ContentItem {}",
+            new Object[]{ chain.getName(), job.isFailed() ? "failed" : "finished",
+                    System.currentTimeMillis()-start,
+                    job.getContentItem().getUri()});
         //NOTE: ExecutionMetadata are not added to the metadata of the ContentItem
         //      by the EnhancementJobManager.
         //      However one could add this as an optional feature to the
@@ -163,6 +178,12 @@ public class EventJobManagerImpl impleme
         	}
         }
         if(!job.isFinished()){
+            log.warn("Execution finished, but Job is not finished!");
+            EnhancementJobHandler.logJobInfo(log, job, null, true);
+            log.warn("ExecutionMetadata: ");
+            for(Iterator<Triple> it = job.getExecutionMetadata().iterator();
+                    it.hasNext();
+                    log.warn(it.next().toString()));
             throw new ChainException("EnhancementJobManager was deactivated while" +
             		" enhancing the passed ContentItem "+job.getContentItem()+
             		" (EnhancementJobManager type: "+getClass()+")");