You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/11/19 12:08:55 UTC
svn commit: r718931 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./
broker/region/ broker/region/policy/ kaha/impl/async/ store/amq/
store/journal/ thread/
Author: gtully
Date: Wed Nov 19 03:08:54 2008
New Revision: 718931
URL: http://svn.apache.org/viewvc?rev=718931&view=rev
Log:
resolve AMQ-2005, Scheduler is now referenced by its users such that it cannot be gc'ed during normal operation
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Nov 19 03:08:54 2008
@@ -93,7 +93,7 @@
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
-
+ protected static final Scheduler scheduler = Scheduler.getInstance();
protected final ActiveMQSession session;
protected final ConsumerInfo info;
@@ -969,7 +969,7 @@
if (redeliveryDelay > 0) {
// Start up the delivery again a little later.
- Scheduler.executeAfterDelay(new Runnable() {
+ scheduler.executeAfterDelay(new Runnable() {
public void run() {
try {
if (started.get()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Nov 19 03:08:54 2008
@@ -148,6 +148,7 @@
}
private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
+ protected static final Scheduler scheduler = Scheduler.getInstance();
protected int acknowledgementMode;
protected final ActiveMQConnection connection;
@@ -779,7 +780,7 @@
for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
}
- Scheduler.executeAfterDelay(new Runnable() {
+ scheduler.executeAfterDelay(new Runnable() {
public void run() {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Nov 19 03:08:54 2008
@@ -53,6 +53,8 @@
public abstract class PrefetchSubscription extends AbstractSubscription {
private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
+ protected static final Scheduler scheduler = Scheduler.getInstance();
+
protected PendingMessageCursor pending;
protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
protected int prefetchExtension;
@@ -109,7 +111,7 @@
dispatchPending();
}
if (pull.getTimeout() > 0) {
- Scheduler.executeAfterDelay(new Runnable() {
+ scheduler.executeAfterDelay(new Runnable() {
public void run() {
pullTimeout(dispatchCounterBeforePull);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java Wed Nov 19 03:08:54 2008
@@ -42,7 +42,8 @@
public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
private static final int GC_INTERVAL = 1000;
-
+ protected static final Scheduler scheduler = Scheduler.getInstance();
+
// TODO: need to get a better synchronized linked list that has little
// contention between enqueuing and dequeuing
private final List<TimestampWrapper> buffer = Collections.synchronizedList(new LinkedList<TimestampWrapper>());
@@ -90,11 +91,11 @@
}
public void start() throws Exception {
- Scheduler.executePeriodically(gcTask, GC_INTERVAL);
+ scheduler.executePeriodically(gcTask, GC_INTERVAL);
}
public void stop() throws Exception {
- Scheduler.cancel(gcTask);
+ scheduler.cancel(gcTask);
}
public void gc() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Wed Nov 19 03:08:54 2008
@@ -75,6 +75,7 @@
public static final int PREFERED_DIFF = 1024 * 512;
private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
+ protected static Scheduler scheduler = Scheduler.getInstance();
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
@@ -191,7 +192,7 @@
cleanup();
}
};
- Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
+ scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
}
public void lock() throws IOException {
@@ -326,7 +327,7 @@
if (!started) {
return;
}
- Scheduler.cancel(cleanupTask);
+ scheduler.cancel(cleanupTask);
accessorPool.close();
storeState(false);
appender.close();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Nov 19 03:08:54 2008
@@ -84,6 +84,7 @@
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
+ private static final Scheduler scheduler = Scheduler.getInstance();
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
@@ -271,14 +272,14 @@
checkpoint(false);
}
};
- Scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
+ scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
periodicCleanupTask = new Runnable() {
public void run() {
cleanup();
}
};
- Scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
+ scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
if (lockAquired && lockLogged) {
LOG.info("Aquired lock for AMQ Store" + getDirectory());
@@ -301,8 +302,8 @@
}
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (this) {
- Scheduler.cancel(periodicCheckpointTask);
- Scheduler.cancel(periodicCleanupTask);
+ scheduler.cancel(periodicCheckpointTask);
+ scheduler.cancel(periodicCleanupTask);
}
Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
while (queueIterator.hasNext()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Nov 19 03:08:54 2008
@@ -83,6 +83,7 @@
*/
public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
+ protected static final Scheduler scheduler = Scheduler.getInstance();
private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
private final Journal journal;
@@ -230,7 +231,7 @@
recover();
// Do a checkpoint periodically.
- Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
+ scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
}
@@ -241,7 +242,7 @@
return;
}
- Scheduler.cancel(periodicCheckpointTask);
+ scheduler.cancel(periodicCheckpointTask);
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Wed Nov 19 03:08:54 2008
@@ -21,25 +21,33 @@
import java.util.TimerTask;
/**
+ * Singelton, references maintained by users
* @version $Revision$
*/
-public final class Scheduler {
+public final class Scheduler {
+ private final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
+ private final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();
+ private static Scheduler instance;
+
+ static {
+ instance = new Scheduler();
+ }
-
- public static final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
- private static final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();
-
private Scheduler() {
}
- public static synchronized void executePeriodically(final Runnable task, long period) {
+ public static Scheduler getInstance() {
+ return instance;
+ }
+
+ public synchronized void executePeriodically(final Runnable task, long period) {
TimerTask timerTask = new SchedulerTimerTask(task);
CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period);
TIMER_TASKS.put(task, timerTask);
}
- public static synchronized void cancel(Runnable task) {
+ public synchronized void cancel(Runnable task) {
TimerTask ticket = TIMER_TASKS.remove(task);
if (ticket != null) {
ticket.cancel();
@@ -47,13 +55,12 @@
}
}
- public static void executeAfterDelay(final Runnable task, long redeliveryDelay) {
+ public void executeAfterDelay(final Runnable task, long redeliveryDelay) {
TimerTask timerTask = new SchedulerTimerTask(task);
CLOCK_DAEMON.schedule(timerTask, redeliveryDelay);
}
- public static void shutdown() {
+ public void shutdown() {
CLOCK_DAEMON.cancel();
}
-
}