You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/06/05 17:55:39 UTC
svn commit: r1489934 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
./ queues/ stats/
Author: cziegeler
Date: Wed Jun 5 15:55:39 2013
New Revision: 1489934
URL: http://svn.apache.org/r1489934
Log:
SLING-2906 : Queue might be outdated and closed while still processing
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Jun 5 15:55:39 2013
@@ -205,8 +205,6 @@ public class JobManagerImpl
final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
while ( i.hasNext() ) {
final AbstractJobQueue jbq = i.next();
- // update mbeans
- ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
jbq.close();
// update mbeans
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
@@ -244,19 +242,14 @@ public class JobManagerImpl
while ( i.hasNext() ) {
final Map.Entry<String, AbstractJobQueue> current = i.next();
final AbstractJobQueue jbq = current.getValue();
- if ( jbq.isMarkedForRemoval() ) {
- logger.debug("Removing idle Job Queue {}", jbq);
- // close
- jbq.close();
+ if ( jbq.tryToClose() ) {
+ logger.debug("Removing idle job queue {}", jbq);
// copy statistics
this.baseStatistics.add(jbq);
// remove
i.remove();
// update mbeans
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
- } else {
- // mark to be removed during next cycle
- jbq.markForRemoval();
}
}
}
@@ -377,10 +370,7 @@ public class JobManagerImpl
// remove the queue with the old name
this.queues.remove(queue.getName());
// check if we can close or have to rename
- queue.markForRemoval();
- if ( queue.isMarkedForRemoval() ) {
- // close
- queue.close();
+ if ( queue.tryToClose() ) {
// copy statistics
this.baseStatistics.add(queue);
// update mbeans
@@ -388,7 +378,12 @@ public class JobManagerImpl
} else {
queue.outdate();
// readd with new name
- this.queues.put(queue.getName(), queue);
+ String newName = queue.getName();
+ int index = 0;
+ while ( this.queues.containsKey(newName) ) {
+ newName = queue.getName() + '$' + String.valueOf(index++);
+ }
+ this.queues.put(newName, queue);
// update mbeans
((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, queue));
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Jun 5 15:55:39 2013
@@ -82,9 +82,6 @@ public abstract class AbstractJobQueue
/** Are we still running? */
protected volatile boolean running;
- /** Are we marked for removal */
- private volatile boolean markedForRemoval = false;
-
/** Is the queue currently waiting(sleeping) */
protected volatile boolean isWaiting = false;
@@ -131,7 +128,6 @@ public abstract class AbstractJobQueue
public String getStateInfo() {
synchronized ( this.suspendLock ) {
return "isWaiting=" + this.isWaiting +
- ", markedForRemoval=" + this.markedForRemoval +
", suspendedSince=" + this.suspendedSince +
", asyncJobs=" + this.asyncCounter.get();
}
@@ -151,7 +147,7 @@ public abstract class AbstractJobQueue
try {
runJobQueue();
- } catch (Throwable t) { //NOSONAR
+ } catch (final Throwable t) { //NOSONAR
logger.error("Job queue " + queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t);
}
}
@@ -176,7 +172,6 @@ public abstract class AbstractJobQueue
public void close() {
this.running = false;
this.logger.debug("Shutting down job queue {}", queueName);
- this.logger.debug("Waking up sleeping queue {}", queueName);
this.resume();
if ( this.isWaiting ) {
this.logger.debug("Waking up waiting queue {}", this.queueName);
@@ -195,6 +190,24 @@ public abstract class AbstractJobQueue
}
/**
+ * Check if the queue can be closed
+ */
+ public boolean tryToClose() {
+ // resume the queue as we want to close it!
+ this.resume();
+ // check if possible
+ if ( this.canBeClosed() ) {
+ this.close();
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean canBeClosed() {
+ return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0;
+ }
+
+ /**
* Periodically check for started jobs without an acknowledge.
*/
public void checkForUnprocessedJobs() {
@@ -400,32 +413,6 @@ public abstract class AbstractJobQueue
notifyFinished(reprocessInfo);
}
- protected boolean canBeMarkedForRemoval() {
- return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0;
- }
-
- /**
- * Mark this queue for removal.
- */
- public void markForRemoval() {
- if ( this.canBeMarkedForRemoval() ) {
- this.markedForRemoval = true;
- }
- }
-
- /**
- * Check if this queue is marked for removal
- */
- public boolean isMarkedForRemoval() {
- if ( this.markedForRemoval ) {
- if ( this.canBeMarkedForRemoval() ) {
- return true;
- }
- this.markedForRemoval = false;
- }
- return false;
- }
-
/**
* Get the name of the job queue.
*/
@@ -691,6 +678,7 @@ public abstract class AbstractJobQueue
public void resume() {
synchronized ( this.suspendLock ) {
if ( this.suspendedSince != -1 ) {
+ this.logger.debug("Waking up suspended queue {}", queueName);
this.suspendedSince = -1;
this.suspendLock.notify();
}
@@ -704,6 +692,7 @@ public abstract class AbstractJobQueue
public void suspend() {
synchronized ( this.suspendLock ) {
if ( this.suspendedSince == -1 ) {
+ this.logger.debug("Suspending queue {}", queueName);
this.suspendedSince = System.currentTimeMillis();
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Wed Jun 5 15:55:39 2013
@@ -106,8 +106,8 @@ public abstract class AbstractParallelJo
}
@Override
- protected boolean canBeMarkedForRemoval() {
- boolean result = super.canBeMarkedForRemoval();
+ protected boolean canBeClosed() {
+ boolean result = super.canBeClosed();
if ( result ) {
result = this.jobCount == 0;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Wed Jun 5 15:55:39 2013
@@ -46,6 +46,8 @@ public final class OrderedJobQueue exten
/** Lock and status object for handling the sleep phase. */
private final SleepLock sleepLock = new SleepLock();
+ private boolean isWaitingForNext = false;
+
/** The queue - we use a set which is sorted by job creation date. */
private final Set<JobHandler> queue = new TreeSet<JobHandler>(new Comparator<JobHandler>() {
@@ -75,7 +77,16 @@ public final class OrderedJobQueue exten
@Override
public String getStateInfo() {
- return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince;
+ return super.getStateInfo() + ", isSleepingUntil=" + this.sleepLock.sleepingSince + ", isWaitingForNext=" + this.isWaitingForNext;
+ }
+
+ @Override
+ protected boolean canBeClosed() {
+ boolean result = super.canBeClosed();
+ if ( result ) {
+ result = this.isWaitingForNext;
+ }
+ return result;
}
@Override
@@ -124,6 +135,7 @@ public final class OrderedJobQueue exten
synchronized ( this.queue ) {
this.queue.add(handler);
this.queue.notify();
+ this.isWaitingForNext = false;
}
}
@@ -131,6 +143,7 @@ public final class OrderedJobQueue exten
protected JobHandler take() {
synchronized ( this.queue ) {
while ( this.queue.isEmpty() ) {
+ this.isWaitingForNext = true;
try {
this.queue.wait();
} catch (final InterruptedException e) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Wed Jun 5 15:55:39 2013
@@ -39,6 +39,8 @@ public final class ParallelJobQueue exte
/** The queue. */
private final BlockingQueue<JobHandler> queue = new LinkedBlockingQueue<JobHandler>();
+ private boolean isWaitingForNext = false;
+
public ParallelJobQueue(final String name,
final InternalQueueConfiguration config,
final JobConsumerManager jobConsumerManager,
@@ -48,8 +50,23 @@ public final class ParallelJobQueue exte
}
@Override
+ public String getStateInfo() {
+ return super.getStateInfo() + ", isWaitingForNext=" + this.isWaitingForNext;
+ }
+
+ @Override
+ protected boolean canBeClosed() {
+ boolean result = super.canBeClosed();
+ if ( result ) {
+ result = this.isWaitingForNext;
+ }
+ return result;
+ }
+
+ @Override
protected void put(final JobHandler event) {
try {
+ this.isWaitingForNext = false;
this.queue.put(event);
} catch (final InterruptedException e) {
// this should never happen
@@ -60,10 +77,13 @@ public final class ParallelJobQueue exte
@Override
protected JobHandler take() {
try {
+ this.isWaitingForNext = true;
return this.queue.take();
} catch (final InterruptedException e) {
// this should never happen
this.ignoreException(e);
+ } finally {
+ this.isWaitingForNext = false;
}
return null;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Wed Jun 5 15:55:39 2013
@@ -66,8 +66,8 @@ public final class TopicRoundRobinJobQue
}
@Override
- protected boolean canBeMarkedForRemoval() {
- boolean result = super.canBeMarkedForRemoval();
+ protected boolean canBeClosed() {
+ boolean result = super.canBeClosed();
if ( result ) {
result = this.isWaitingForNext;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java?rev=1489934&r1=1489933&r2=1489934&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java Wed Jun 5 15:55:39 2013
@@ -188,7 +188,7 @@ public class StatisticsImpl implements S
}
/**
- * New job in the qeue
+ * New job in the queue
*/
public synchronized void incQueued() {
this.queuedJobs++;