You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2012/12/09 07:27:15 UTC
svn commit: r1418821 - in /stanbol/trunk/enhancer:
defaults/src/main/resources/config/ jobmanager/event/
jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/
Author: rwesten
Date: Sun Dec 9 06:27:13 2012
New Revision: 1418821
URL: http://svn.apache.org/viewvc?rev=1418821&view=rev
Log:
fix for STANBOL-830 and impplementation of STANBOL-831: The EventJobManager is no capable to concurrently process multiple ContentItems with the same URI
Modified:
stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg
stanbol/trunk/enhancer/jobmanager/event/pom.xml
stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java
Modified: stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg (original)
+++ stanbol/trunk/enhancer/defaults/src/main/resources/config/org.apache.felix.eventadmin.impl.EventAdmin.cfg Sun Dec 9 06:27:13 2012
@@ -22,4 +22,6 @@
# might take a lot of time. For such cases the IgnoreTimeout property can be
# used to exclude specific EventHandlers rather than deactivating this feature
# all together.
-org.apache.felix.eventadmin.IgnoreTimeout=org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJobHandler
\ No newline at end of file
+org.apache.felix.eventadmin.IgnoreTimeout=org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJobHandler
+org.apache.felix.eventadmin.ThreadPoolSize=20
+org.apache.felix.eventadmin.CacheSize=1024
\ No newline at end of file
Modified: stanbol/trunk/enhancer/jobmanager/event/pom.xml
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/pom.xml?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/pom.xml (original)
+++ stanbol/trunk/enhancer/jobmanager/event/pom.xml Sun Dec 9 06:27:13 2012
@@ -84,7 +84,7 @@
<dependency>
<groupId>org.apache.stanbol</groupId>
<artifactId>org.apache.stanbol.enhancer.servicesapi</artifactId>
- <version>0.9.0-incubating</version>
+ <version>0.10.0-SNAPSHOT</version>
</dependency>
<dependency>
Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java Sun Dec 9 06:27:13 2012
@@ -330,13 +330,13 @@ public class EnhancementJob {
* @return the currently running executions.
*/
public Set<NonLiteral> getRunning() {
- log.debug("++ r: {}","getRunning");
+ log.trace("++ r: {}","getRunning");
readLock.lock();
try {
- log.debug(">> r: {}","getRunning");
+ log.trace(">> r: {}","getRunning");
return runningExec;
} finally {
- log.debug("<< r: {}","getRunning");
+ log.trace("<< r: {}","getRunning");
readLock.unlock();
}
}
@@ -347,13 +347,13 @@ public class EnhancementJob {
* @return the completed execution nodes
*/
public Set<NonLiteral> getCompleted() {
- log.debug("++ r: {}","getCompleted");
+ log.trace("++ r: {}","getCompleted");
readLock.lock();
try {
- log.debug(">> r: {}","getCompleted");
+ log.trace(">> r: {}","getCompleted");
return completedExec;
} finally {
- log.debug("<< r: {}","getCompleted");
+ log.trace("<< r: {}","getCompleted");
readLock.unlock();
}
}
@@ -377,13 +377,13 @@ public class EnhancementJob {
}
writeLock.lock();
NonLiteral executionNode = getExecutionNode(execution);
- log.debug("++ w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
+ log.trace("++ w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
try {
- log.debug(">> w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
+ log.trace(">> w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
setNodeCompleted(executionNode);
setExecutionCompleted(executionMetadata, execution, null);
} finally {
- log.debug("<< w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
+ log.trace("<< w: {}: {}","setCompleted",getEngine(executionPlan, executionNode));
writeLock.unlock();
}
}
@@ -420,7 +420,7 @@ public class EnhancementJob {
+ " | chain.running " + running + ")!");
}
if (running.remove(executionNode)) {
- log.debug(
+ log.trace(
"Execution of '{}' for ContentItem {} completed "
+ "(chain: {}, node: {}, optional {})",
new Object[] {engine, contentItem.getUri().getUnicodeString(),
@@ -455,10 +455,10 @@ public class EnhancementJob {
String engine = getEngine(executionPlan, executionNode);
boolean optional = isOptional(executionPlan, executionNode);
Set<NonLiteral> dependsOn = getDependend(executionPlan, executionNode);
- log.debug("++ w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+ log.trace("++ w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
writeLock.lock();
try {
- log.debug(">> w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+ log.trace(">> w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
if (completed.contains(executionNode)) {
String message = "Unable to set state of ExectionNode '" + executionNode + "'(chain '"
+ chain + "' | contentItem '" + contentItem.getUri()
@@ -490,7 +490,7 @@ public class EnhancementJob {
chain, executionNode, optional});
return;
} else { //added an engine to running
- log.debug("Started Execution of '{}' for ContentItem {} "
+ log.trace("Started Execution of '{}' for ContentItem {} "
+ "(chain: {}, node: {}, optional {})",
new Object[] {engine, contentItem.getUri().getUnicodeString(), chain,
executionNode, optional});
@@ -502,7 +502,7 @@ public class EnhancementJob {
checkExecutable();
}
} finally {
- log.debug("<< w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+ log.trace("<< w: {}: {}","setRunning",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
writeLock.unlock();
}
}
@@ -542,7 +542,7 @@ public class EnhancementJob {
for(NonLiteral node : executeableNodes){
engines.add(getEngine(executionPlan, node));
}
- log.debug("MARK {} as executeable",engines);
+ log.trace("MARK {} as executeable",engines);
}
//we need to get the em:Executables for the ep:ExecutionNodes ...
if(executeableNodes.isEmpty()){
@@ -571,13 +571,13 @@ public class EnhancementJob {
* currently running engines.
*/
public Set<NonLiteral> getExecutable(){
- log.debug("++ r: {}","getExecutable");
+ log.trace("++ r: {}","getExecutable");
readLock.lock();
- log.debug(">> r: {}","getExecutable");
+ log.trace(">> r: {}","getExecutable");
try {
return executable;
} finally {
- log.debug("<< r: {}:{}","getExecutable",executable);
+ log.trace("<< r: {}:{}","getExecutable",executable);
readLock.unlock();
}
}
@@ -586,14 +586,14 @@ public class EnhancementJob {
* @return if this enhancement job is finished.
*/
public boolean isFinished(){
- log.debug("++ r: {}","isFinished");
+ log.trace("++ r: {}","isFinished");
readLock.lock();
try {
- log.debug(">> r: {}","isFinished");
+ log.trace(">> r: {}","isFinished");
return running.isEmpty() && // wait for running engine (regard if failed or not)
(executable.isEmpty() || isFailed()); //no more engines or already failed
} finally {
- log.debug("<< r: {}","isFinished");
+ log.trace("<< r: {}","isFinished");
readLock.unlock();
}
}
@@ -605,10 +605,10 @@ public class EnhancementJob {
NonLiteral executionNode = getExecutionNode(execution);
final boolean optional = isOptional(executionPlan, executionNode);
final String engineName = getEngine(executionPlan, executionNode);
- log.debug("++ w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+ log.trace("++ w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
writeLock.lock();
try {
- log.debug(">> w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+ log.trace(">> w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
StringBuilder message = new StringBuilder();
message.append(String.format("Unable to process ContentItem '%s' with " +
"Enhancement Engine '%s' because the engine ",
@@ -637,7 +637,7 @@ public class EnhancementJob {
//re-throwing by the EnhancementJobManager.
}
} finally {
- log.debug("<< w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
+ log.trace("<< w: {}: {}","setFailed",ExecutionPlanHelper.getEngine(executionPlan, executionNode));
writeLock.unlock();
}
@@ -648,26 +648,29 @@ public class EnhancementJob {
* @return if the EnhancementJob has failed or not.
*/
public boolean isFailed() {
- log.debug("++ r: {}","isFailed");
+ log.trace("++ r: {}","isFailed");
readLock.lock();
try {
- log.debug(">> r: {}","isFailed");
+ log.trace(">> r: {}","isFailed");
return isExecutionFailed(executionMetadata, chainExecutionNode);
} finally {
- log.debug("<< r: {}","isFailed");
+ log.trace("<< r: {}","isFailed");
readLock.unlock();
}
}
-
- @Override
- public int hashCode() {
- return contentItem.getUri().hashCode();
- }
- @Override
- public boolean equals(Object o) {
- return o instanceof EnhancementJob &&
- contentItem.getUri().equals(((EnhancementJob)o).contentItem.getUri());
- }
+// NOTE: use default implementations of hashCode and equals for now as we need
+// to support the concurrent enhancement of ContentItems with the same
+// URI. Also two ContentItems with the same URI might still have other
+// content (as users can manually parse the URI in the request).
+// @Override
+// public int hashCode() {
+// return contentItem.getUri().hashCode();
+// }
+// @Override
+// public boolean equals(Object o) {
+// return o instanceof EnhancementJob &&
+// contentItem.getUri().equals(((EnhancementJob)o).contentItem.getUri());
+// }
@Override
public String toString() {
return "EnhancementJob for ContentItem "+contentItem.getUri();
Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java Sun Dec 9 06:27:13 2012
@@ -44,8 +44,6 @@ import org.osgi.service.event.EventHandl
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ibm.icu.lang.UCharacter.SentenceBreak;
-
public class EnhancementJobHandler implements EventHandler {
private EnhancementEngineManager engineManager;
@@ -144,10 +142,14 @@ public class EnhancementJobHandler imple
observer = processingJobs.get(enhancementJob);
if(observer == null){
observer = new EnhancementJobObserver(enhancementJob);
- logJobInfo(log, enhancementJob, "Add EnhancementJob:",false);
+ if(log.isDebugEnabled()){
+ logJobInfo(log, enhancementJob, "Add EnhancementJob:",log.isTraceEnabled());
+ }
processingJobs.put(enhancementJob, observer);
init = true;
} else {
+ log.warn("Request to register an EnhancementJob for an ContentItem {} that is" +
+ "already registered "+enhancementJob.getContentItem().getUri());
init = false;
}
} finally {
@@ -156,13 +158,19 @@ public class EnhancementJobHandler imple
if(init){
observer.acquire();
enhancementJob.startProcessing();
- log.debug("++ w: {}","init execution");
+ log.trace("++ w: {}","init execution");
enhancementJob.getLock().writeLock().lock();
try {
- log.debug(">> w: {}","init execution");
- executeNextNodes(enhancementJob);
+ log.trace(">> w: {}","init execution");
+ if(!executeNextNodes(enhancementJob)){
+ String message = "Unable to start Execution of "+enhancementJob.getContentItem().getUri();
+ log.warn(message);
+ logJobInfo(log, enhancementJob, null, true);
+ log.warn("finishing job ...");
+ finish(enhancementJob);
+ }
} finally {
- log.debug("<< w: {}","init execution");
+ log.trace("<< w: {}","init execution");
enhancementJob.getLock().writeLock().unlock();
}
}
@@ -188,9 +196,9 @@ public class EnhancementJobHandler imple
log.error(message,t);
}
//(2) trigger the next actions
- log.debug("++ w: {}","check for next Executions");
+ log.trace("++ w: {}","check for next Executions");
job.getLock().writeLock().lock();
- log.debug(">> w: {}","check for next Executions");
+ log.trace(">> w: {}","check for next Executions");
try {
if(job.isFinished()){
finish(job);
@@ -213,7 +221,7 @@ public class EnhancementJobHandler imple
}
}
} finally {
- log.debug("<< w: {}","check for next Executions");
+ log.trace("<< w: {}","check for next Executions");
job.getLock().writeLock().unlock();
}
}
@@ -222,8 +230,8 @@ public class EnhancementJobHandler imple
* @param execution
*/
private void processEvent(EnhancementJob job, NonLiteral execution) {
- NonLiteral executionNode = job.getExecutionNode(execution);
- String engineName = getEngine(job.getExecutionPlan(), executionNode);
+ String engineName = getEngine(job.getExecutionPlan(),
+ job.getExecutionNode(execution));
//(1) execute the parsed ExecutionNode
EnhancementEngine engine = engineManager.getEngine(engineName);
if(engine != null){
@@ -241,27 +249,33 @@ public class EnhancementJobHandler imple
}
if(engineState == EnhancementEngine.ENHANCE_SYNCHRONOUS){
//ensure that this engine exclusively access the content item
- log.debug("++ w: {}: {}","start sync execution", engine.getName());
+ log.trace("++ w: {}: {}","start sync execution", engine.getName());
job.getLock().writeLock().lock();
- log.debug(">> w: {}: {}","start sync execution", engine.getName());
+ log.trace(">> w: {}: {}","start sync execution", engine.getName());
try {
engine.computeEnhancements(job.getContentItem());
job.setCompleted(execution);
} catch (EngineException e){
+ log.warn(e.getMessage(),e);
+ job.setFailed(execution, engine, e);
+ } catch (RuntimeException e){
+ log.warn(e.getMessage(),e);
job.setFailed(execution, engine, e);
} finally{
- log.debug("<< w: {}: {}","finished sync execution", engine.getName());
+ log.trace("<< w: {}: {}","finished sync execution", engine.getName());
job.getLock().writeLock().unlock();
}
} else if(engineState == EnhancementEngine.ENHANCE_ASYNC){
try {
- log.debug("++ n: start async execution of Engine {}",engine.getName());
+ log.trace("++ n: start async execution of Engine {}",engine.getName());
engine.computeEnhancements(job.getContentItem());
- log.debug("++ n: finished async execution of Engine {}",engine.getName());
+ log.trace("++ n: finished async execution of Engine {}",engine.getName());
job.setCompleted(execution);
} catch (EngineException e) {
+ log.warn(e.getMessage(),e);
job.setFailed(execution, engine, e);
} catch (RuntimeException e) {
+ log.warn(e.getMessage(),e);
job.setFailed(execution, engine, e);
}
} else { //CANNOT_ENHANCE
@@ -291,8 +305,10 @@ public class EnhancementJobHandler imple
}
if(observer != null) {
try {
- logJobInfo(log, job, "Finished EnhancementJob:",false);
- log.debug("++ n: finished processing ContentItem {} with Chain {}",
+ if(log.isDebugEnabled()){
+ logJobInfo(log, job, "Finished EnhancementJob:",log.isTraceEnabled());
+ }
+ log.trace("++ n: finished processing ContentItem {} with Chain {}",
job.getContentItem().getUri(),job.getChainName());
} finally {
//release the semaphore to send signal to the EventJobManager waiting
@@ -316,16 +332,16 @@ public class EnhancementJobHandler imple
//getExecutable returns an snapshot so we do not need to lock
boolean startedExecution = false;
for(NonLiteral executable : job.getExecutable()){
- if(log.isDebugEnabled()){
- log.debug("PREPARE execution of Engine {}",
+ if(log.isTraceEnabled()){
+ log.trace("PREPARE execution of Engine {}",
getEngine(job.getExecutionPlan(), job.getExecutionNode(executable)));
}
Dictionary<String,Object> properties = new Hashtable<String,Object>();
properties.put(PROPERTY_JOB_MANAGER, job);
properties.put(PROPERTY_EXECUTION, executable);
job.setRunning(executable);
- if(log.isDebugEnabled()){
- log.debug("SHEDULE execution of Engine {}",
+ if(log.isTraceEnabled()){
+ log.trace("SHEDULE execution of Engine {}",
getEngine(job.getExecutionPlan(), job.getExecutionNode(executable)));
}
eventAdmin.postEvent(new Event(TOPIC_JOB_MANAGER,properties));
@@ -356,6 +372,16 @@ public class EnhancementJobHandler imple
log.info(" - {} running",getEngine(job.getExecutionMetadata(),
job.getExecutionNode(runningExec)));
}
+ for(NonLiteral executeable : job.getExecutable()){
+ log.info(" - {} executeable",getEngine(job.getExecutionMetadata(),
+ job.getExecutionNode(executeable)));
+ }
+ }
+ if(job.getErrorMessage() != null){
+ log.info("Error Message: {}",job.getErrorMessage());
+ }
+ if(job.getError() != null){
+ log.info("Reported Exception:",job.getError());
}
}
public class EnhancementJobObserver{
@@ -394,14 +420,16 @@ public class EnhancementJobHandler imple
}
}
- public void waitForCompletion(int maxEnhancementJobWaitTime) {
+ public boolean waitForCompletion(int maxEnhancementJobWaitTime) {
+ boolean acquire = false;
if(semaphore.availablePermits() < 1){
// The only permit is taken by the EnhancementJobHander
try {
- semaphore.tryAcquire(1,
+ acquire = semaphore.tryAcquire(1,
Math.max(MIN_WAIT_TIME, maxEnhancementJobWaitTime),TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//interupted
+ acquire = false;
}
} else if(!hasCompleted()){
int wait = Math.max(100, maxEnhancementJobWaitTime/10);
@@ -417,9 +445,10 @@ public class EnhancementJobHandler imple
} catch (InterruptedException e) {
//interupted
}
+ acquire = true;
}// else completed
+ return acquire;
}
-
}
@@ -457,7 +486,7 @@ public class EnhancementJobHandler imple
readLock.unlock();
}
if(!jobs.isEmpty()){
- observerLog.info(" -- {} active Enhancement Jobs",jobs.size());
+ observerLog.debug(" -- {} active Enhancement Jobs",jobs.size());
if(observerLog.isDebugEnabled()){
for(EnhancementJob job : jobs){
Lock jobLock = job.getLock().readLock();
Modified: stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java?rev=1418821&r1=1418820&r2=1418821&view=diff
==============================================================================
--- stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java (original)
+++ stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EventJobManagerImpl.java Sun Dec 9 06:27:13 2012
@@ -20,10 +20,12 @@ import static org.apache.stanbol.enhance
import java.util.Dictionary;
import java.util.Hashtable;
+import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.Semaphore;
+import java.util.Map.Entry;
import org.apache.clerezza.rdf.core.Graph;
+import org.apache.clerezza.rdf.core.Triple;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -41,6 +43,8 @@ import org.apache.stanbol.enhancer.servi
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngineManager;
import org.apache.stanbol.enhancer.servicesapi.EnhancementJobManager;
import org.apache.stanbol.enhancer.servicesapi.helper.ExecutionPlanHelper;
+import org.apache.stanbol.enhancer.servicesapi.helper.execution.Execution;
+import org.apache.stanbol.enhancer.servicesapi.helper.execution.ExecutionMetadata;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
@@ -64,7 +68,10 @@ public class EventJobManagerImpl impleme
public static final String MAX_ENHANCEMENT_JOB_WAIT_TIME = "stanbol.maxEnhancementJobWaitTime";
- public static final int DEFAULT_MAX_ENHANCEMENT_JOB_WAIT_TIME = 10 * 1000;
+ /**
+ * default max wait time is 60sec (similar to the http timeout)
+ */
+ public static final int DEFAULT_MAX_ENHANCEMENT_JOB_WAIT_TIME = 60 * 1000;
@Reference
protected ChainManager chainManager;
@@ -141,14 +148,22 @@ public class EventJobManagerImpl impleme
//start the execution
//wait for the results
EnhancementJobObserver observer = jobHandler.register(job);
- //TODO: allow configuring a max completion time (e.g. 1min)
- while(!observer.hasCompleted() & jobHandler != null){
- observer.waitForCompletion(maxEnhancementJobWaitTime);
- }
- log.info("{} EnhancementJob for ContentItem {} after {}ms",
- new Object[]{ job.isFailed() ? "Failed" : "Finished",
- job.getContentItem().getUri(),
- System.currentTimeMillis()-start});
+ //now wait for the execution to finish for the configured maximum time
+ boolean completed = observer.waitForCompletion(maxEnhancementJobWaitTime);
+ if(!completed){ //throw timeout exception
+ StringBuilder sb = new StringBuilder("Status:\n");
+ ExecutionMetadata em = ExecutionMetadata.parseFrom(job.getExecutionMetadata(), ci.getUri());
+ for(Entry<String,Execution> ex : em.getEngineExecutions().entrySet()){
+ sb.append(" -").append(ex.getKey()).append(": ").append(ex.getValue().getStatus()).append('\n');
+ }
+ throw new ChainException("Execution timeout after {}sec "+sb.toString()
+ +" \n To change the timeout change value of property '"+
+ MAX_ENHANCEMENT_JOB_WAIT_TIME+"' for the service "+getClass());
+ }
+ log.info("Execution of Chain {} {} after {}ms for ContentItem {}",
+ new Object[]{ chain.getName(), job.isFailed() ? "failed" : "finished",
+ System.currentTimeMillis()-start,
+ job.getContentItem().getUri()});
//NOTE: ExecutionMetadata are not added to the metadata of the ContentItem
// by the EnhancementJobManager.
// However one could add this as an optional feature to the
@@ -163,6 +178,12 @@ public class EventJobManagerImpl impleme
}
}
if(!job.isFinished()){
+ log.warn("Execution finished, but Job is not finished!");
+ EnhancementJobHandler.logJobInfo(log, job, null, true);
+ log.warn("ExecutionMetadata: ");
+ for(Iterator<Triple> it = job.getExecutionMetadata().iterator();
+ it.hasNext();
+ log.warn(it.next().toString()));
throw new ChainException("EnhancementJobManager was deactivated while" +
" enhancing the passed ContentItem "+job.getContentItem()+
" (EnhancementJobManager type: "+getClass()+")");