You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/05/03 10:50:11 UTC
svn commit: r1478672 -
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Author: cziegeler
Date: Fri May 3 08:50:11 2013
New Revision: 1478672
URL: http://svn.apache.org/r1478672
Log:
SLING-2829 : Don't close queue if async jobs are running
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1478672&r1=1478671&r2=1478672&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri May 3 08:50:11 2013
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.EventUtil;
@@ -99,6 +100,9 @@ public abstract class AbstractJobQueue
/** Suspend lock. */
private final Object suspendLock = new Object();
+ /** Async counter. */
+ private final AtomicInteger asyncCounter = new AtomicInteger();
+
/**
* Start this queue
* @param name The queue name
@@ -123,7 +127,10 @@ public abstract class AbstractJobQueue
@Override
public String getStateInfo() {
synchronized ( this.suspendLock ) {
- return "isWaiting=" + this.isWaiting + ", markedForRemoval=" + this.markedForRemoval + ", suspendedSince=" + this.suspendedSince;
+ return "isWaiting=" + this.isWaiting +
+ ", markedForRemoval=" + this.markedForRemoval +
+ ", suspendedSince=" + this.suspendedSince +
+ ", asyncJobs=" + this.asyncCounter.get();
}
}
@@ -391,7 +398,7 @@ public abstract class AbstractJobQueue
}
protected boolean canBeMarkedForRemoval() {
- return this.isEmpty() && !this.isWaiting &&!this.isSuspended();
+ return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0;
}
/**
@@ -529,10 +536,10 @@ public abstract class AbstractJobQueue
if ( !asyncDone.get() ) {
asyncDone.set(true);
finishedJob(job.getId(), result, true);
+ asyncCounter.decrementAndGet();
} else {
throw new IllegalStateException("Job is already marked as processed");
}
- asyncLock.notify();
}
}
@@ -566,6 +573,7 @@ public abstract class AbstractJobQueue
}
}
if ( result == JobConsumer.JobResult.ASYNC ) {
+ asyncCounter.incrementAndGet();
notifyFinished(null);
}
}