You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/19 22:09:41 UTC

svn commit: r605698 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/thread/ main/java/org/apache/activemq/transport/ test/java/org/apache/activemq/perf/

Author: rajdavies
Date: Wed Dec 19 13:09:40 2007
New Revision: 605698

URL: http://svn.apache.org/viewvc?rev=605698&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1525

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=605698&r1=605697&r2=605698&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Wed Dec 19 13:09:40 2007
@@ -25,17 +25,7 @@
  */
 public final class Scheduler {
 
-    private static final class SchedulerTimerTask extends TimerTask {
-		private final Runnable task;
-
-		private SchedulerTimerTask(Runnable task) {
-			this.task = task;
-		}
-
-		public void run() {
-			task.run();							
-		}
-	}
+    
 
 	public static final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
     private static final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java?rev=605698&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java Wed Dec 19 13:09:40 2007
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.thread;
+
+import java.util.TimerTask;
+
+/**
+ * A TimeTask for a Runnable object
+ *
+ */
+public class SchedulerTimerTask extends TimerTask {
+    private final Runnable task;
+
+    public SchedulerTimerTask(Runnable task) {
+        this.task = task;
+    }
+
+    public void run() {
+        this.task.run();                         
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=605698&r1=605697&r2=605698&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Wed Dec 19 13:09:40 2007
@@ -17,11 +17,17 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.util.Timer;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.SchedulerTimerTask;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -34,7 +40,9 @@
 public class InactivityMonitor extends TransportFilter {
 
     private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
-
+    private static final ThreadPoolExecutor ASYNC_TASKS;
+    private static final Timer  READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck");
+    private static final Timer  WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck");
     private WireFormatInfo localWireFormatInfo;
     private WireFormatInfo remoteWireFormatInfo;
     private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
@@ -44,6 +52,8 @@
 
     private final AtomicBoolean commandReceived = new AtomicBoolean(true);
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
+    private SchedulerTimerTask writeCheckerTask;
+    private SchedulerTimerTask readCheckerTask;
 
     private final Runnable readChecker = new Runnable() {
         long lastRunTime;
@@ -51,6 +61,7 @@
             long now = System.currentTimeMillis();
             if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
                 LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check.");
+              
             }
             lastRunTime = now; 
             readCheck();
@@ -62,7 +73,8 @@
         public void run() {
             long now = System.currentTimeMillis();
             if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
-                LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check.");
+                LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
+                
             }
             lastRunTime = now; 
             writeCheck();
@@ -80,14 +92,17 @@
 
     final void writeCheck() {
             if (inSend.get()) {
-            LOG.trace("A send is in progress");
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("A send is in progress");
+            }
             return;
         }
 
         if (!commandSent.get()) {
-            LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
-            // TODO: use a thread pool for this..
-            Thread thread = new Thread("ActiveMQ: Activity Generator: "+next.getRemoteAddress()) {
+            if(LOG.isTraceEnabled()) {
+                LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
+            }
+            ASYNC_TASKS.execute(new Runnable() {
                 public void run() {
                     try {
                         oneway(new KeepAliveInfo());
@@ -95,11 +110,11 @@
                         onException(e);
                     }
                 };
-            };
-            thread.setDaemon(true);
-            thread.start();
+            });
         } else {
-            LOG.trace("Message sent since last write check, resetting flag");
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message sent since last write check, resetting flag");
+            }
         }
 
         commandSent.set(false);
@@ -107,29 +122,34 @@
 
     final void readCheck() {
         if (inReceive.get()) {
-            LOG.trace("A receive is in progress");
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("A receive is in progress");
+            }
             return;
         }
-
         if (!commandReceived.get()) {
-            LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+            }
+           
 
             // TODO: use a thread pool for this..
-            Thread thread = new Thread("ActiveMQ: Inactivity Handler: "+next.getRemoteAddress()) {
+            ASYNC_TASKS.execute(new Runnable() {
                 public void run() {
                         onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
                 };
-            };
-            thread.setDaemon(true);
-            thread.start();
+            });
 
         } else {
-            LOG.trace("Message received since last read check, resetting flag: ");
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message received since last read check, resetting flag: ");
+            }
         }
         commandReceived.set(false);
     }
 
     public void onCommand(Object command) {
+        commandReceived.set(true);
         inReceive.set(true);
         try {
             if (command.getClass() == WireFormatInfo.class) {
@@ -150,7 +170,7 @@
                 transportListener.onCommand(command);
             }
         } finally {
-            commandReceived.set(true);
+            
             inReceive.set(false);
         }
     }
@@ -192,11 +212,14 @@
             return;
         }
 
-        long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
-        if (l > 0) {
+        long checkTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+        if (checkTime > 0) {
             monitorStarted.set(true);
-            Scheduler.executePeriodically(writeChecker, l / 2);
-            Scheduler.executePeriodically(readChecker, l);
+            writeCheckerTask = new SchedulerTimerTask(writeChecker);
+            readCheckerTask = new  SchedulerTimerTask(readChecker);
+            long writeCheckTime = checkTime/3;
+            WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, writeCheckTime,writeCheckTime);
+            READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, checkTime,checkTime);
         }
     }
 
@@ -205,9 +228,22 @@
      */
     private synchronized void stopMonitorThreads() {
         if (monitorStarted.compareAndSet(true, false)) {
-            Scheduler.cancel(readChecker);
-            Scheduler.cancel(writeChecker);
-        }
+            readCheckerTask.cancel();
+            writeCheckerTask.cancel();
+            WRITE_CHECK_TIMER.purge();
+            READ_CHECK_TIMER.purge();
+        }
+    }
+    
+       
+    static {
+        ASYNC_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
     }
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=605698&r1=605697&r2=605698&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java Wed Dec 19 13:09:40 2007
@@ -47,7 +47,7 @@
     protected int samepleCount = 20;
     protected long sampleInternal = 10000;
     protected int numberOfConsumers = 1;
-    protected int numberofProducers = 2;
+    protected int numberofProducers = 0;
     protected int playloadSize = 1024;
     protected byte[] array;
     protected ConnectionFactory factory;
@@ -164,8 +164,12 @@
             totalRate += rate.getRate();
             totalCount += rate.getTotalCount();
         }
-        int avgRate = totalRate / producers.length;
-        System.out.println("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount);
+        if (producers != null && producers.length > 0) {
+            int avgRate = totalRate / producers.length;
+            System.out.println("Avg producer rate = " + avgRate
+                    + " msg/sec | Total rate = " + totalRate + ", sent = "
+                    + totalCount);
+        }
     }
 
     protected void dumpConsumerRate() {