You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/11/19 12:08:55 UTC

svn commit: r718931 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/region/ broker/region/policy/ kaha/impl/async/ store/amq/ store/journal/ thread/

Author: gtully
Date: Wed Nov 19 03:08:54 2008
New Revision: 718931

URL: http://svn.apache.org/viewvc?rev=718931&view=rev
Log:
resolve AMQ-2005, Scheduler is now referenced by its users such that it cannot be gc'ed during normal operation

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Nov 19 03:08:54 2008
@@ -93,7 +93,7 @@
 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
 
     private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
-
+    protected static final Scheduler scheduler = Scheduler.getInstance();
     protected final ActiveMQSession session;
     protected final ConsumerInfo info;
 
@@ -969,7 +969,7 @@
     
                     if (redeliveryDelay > 0) {
                         // Start up the delivery again a little later.
-                        Scheduler.executeAfterDelay(new Runnable() {
+                        scheduler.executeAfterDelay(new Runnable() {
                             public void run() {
                                 try {
                                     if (started.get()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Nov 19 03:08:54 2008
@@ -148,6 +148,7 @@
     }
 
     private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
+    protected static final Scheduler scheduler = Scheduler.getInstance();
 
     protected int acknowledgementMode;
     protected final ActiveMQConnection connection;
@@ -779,7 +780,7 @@
                                 for (int i = 0; i < redeliveryCounter; i++) {
                                     redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
                                 }
-                                Scheduler.executeAfterDelay(new Runnable() {
+                                scheduler.executeAfterDelay(new Runnable() {
 
                                     public void run() {
                                         ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Nov 19 03:08:54 2008
@@ -53,6 +53,8 @@
 public abstract class PrefetchSubscription extends AbstractSubscription {
 
     private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
+    protected static final Scheduler scheduler = Scheduler.getInstance();
+    
     protected PendingMessageCursor pending;
     protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
     protected int prefetchExtension;
@@ -109,7 +111,7 @@
 	                    dispatchPending();
 	                }
 	                if (pull.getTimeout() > 0) {
-	                    Scheduler.executeAfterDelay(new Runnable() {
+	                    scheduler.executeAfterDelay(new Runnable() {
 	
 	                        public void run() {
 	                            pullTimeout(dispatchCounterBeforePull);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java Wed Nov 19 03:08:54 2008
@@ -42,7 +42,8 @@
 public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
 
     private static final int GC_INTERVAL = 1000;
-
+    protected static final Scheduler scheduler = Scheduler.getInstance();
+    
     // TODO: need to get a better synchronized linked list that has little
     // contention between enqueuing and dequeuing
     private final List<TimestampWrapper> buffer = Collections.synchronizedList(new LinkedList<TimestampWrapper>());
@@ -90,11 +91,11 @@
     }
 
     public void start() throws Exception {
-        Scheduler.executePeriodically(gcTask, GC_INTERVAL);
+        scheduler.executePeriodically(gcTask, GC_INTERVAL);
     }
 
     public void stop() throws Exception {
-        Scheduler.cancel(gcTask);
+        scheduler.cancel(gcTask);
     }
 
     public void gc() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Wed Nov 19 03:08:54 2008
@@ -75,6 +75,7 @@
     public static final int PREFERED_DIFF = 1024 * 512;
 
     private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
+    protected static Scheduler scheduler  = Scheduler.getInstance();
 
     protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
 
@@ -191,7 +192,7 @@
                 cleanup();
             }
         };
-        Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
+        scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
     }
 
     public void lock() throws IOException {
@@ -326,7 +327,7 @@
         if (!started) {
             return;
         }
-        Scheduler.cancel(cleanupTask);
+        scheduler.cancel(cleanupTask);
         accessorPool.close();
         storeState(false);
         appender.close();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Nov 19 03:08:54 2008
@@ -84,6 +84,7 @@
 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
 
     private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
+    private static final Scheduler scheduler = Scheduler.getInstance();
     private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
     private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
     private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
@@ -271,14 +272,14 @@
                 checkpoint(false);
             }
         };
-        Scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
+        scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
         periodicCleanupTask = new Runnable() {
 
             public void run() {
                 cleanup();
             }
         };
-        Scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
+        scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
         
         if (lockAquired && lockLogged) {
             LOG.info("Aquired lock for AMQ Store" + getDirectory());
@@ -301,8 +302,8 @@
         }
         this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (this) {
-            Scheduler.cancel(periodicCheckpointTask);
-            Scheduler.cancel(periodicCleanupTask);
+            scheduler.cancel(periodicCheckpointTask);
+            scheduler.cancel(periodicCleanupTask);
         }
         Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
         while (queueIterator.hasNext()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Nov 19 03:08:54 2008
@@ -83,6 +83,7 @@
  */
 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
 
+    protected static final Scheduler scheduler = Scheduler.getInstance();
     private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
 
     private final Journal journal;
@@ -230,7 +231,7 @@
         recover();
 
         // Do a checkpoint periodically.
-        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
+        scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
 
     }
 
@@ -241,7 +242,7 @@
             return;
         }
 
-        Scheduler.cancel(periodicCheckpointTask);
+        scheduler.cancel(periodicCheckpointTask);
 
         // Take one final checkpoint and stop checkpoint processing.
         checkpoint(true, true);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=718931&r1=718930&r2=718931&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Wed Nov 19 03:08:54 2008
@@ -21,25 +21,33 @@
 import java.util.TimerTask;
 
 /**
+ * Singelton, references maintained by users
  * @version $Revision$
  */
-public final class Scheduler {
+public final class Scheduler { 
 
+	private final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
+    private final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();
+    private static Scheduler instance;
+    
+    static {
+        instance = new Scheduler();
+    }
     
-
-	public static final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
-    private static final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();
-
     private Scheduler() {
     }
 
-    public static synchronized void executePeriodically(final Runnable task, long period) {
+    public static Scheduler getInstance() {
+        return instance;
+    }
+    
+    public synchronized void executePeriodically(final Runnable task, long period) {
     	TimerTask timerTask = new SchedulerTimerTask(task);
         CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period);
         TIMER_TASKS.put(task, timerTask);
     }
 
-    public static synchronized void cancel(Runnable task) {
+    public synchronized void cancel(Runnable task) {
     	TimerTask ticket = TIMER_TASKS.remove(task);
         if (ticket != null) {
             ticket.cancel();
@@ -47,13 +55,12 @@
         }
     }
 
-    public static void executeAfterDelay(final Runnable task, long redeliveryDelay) {
+    public void executeAfterDelay(final Runnable task, long redeliveryDelay) {
     	TimerTask timerTask = new SchedulerTimerTask(task);
         CLOCK_DAEMON.schedule(timerTask, redeliveryDelay);
     }
     
-    public static void shutdown() {
+    public void shutdown() {
         CLOCK_DAEMON.cancel();
     }
-
 }