You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/19 22:09:41 UTC
svn commit: r605698 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/thread/
main/java/org/apache/activemq/transport/ test/java/org/apache/activemq/perf/
Author: rajdavies
Date: Wed Dec 19 13:09:40 2007
New Revision: 605698
URL: http://svn.apache.org/viewvc?rev=605698&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1525
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=605698&r1=605697&r2=605698&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Wed Dec 19 13:09:40 2007
@@ -25,17 +25,7 @@
*/
public final class Scheduler {
- private static final class SchedulerTimerTask extends TimerTask {
- private final Runnable task;
-
- private SchedulerTimerTask(Runnable task) {
- this.task = task;
- }
-
- public void run() {
- task.run();
- }
- }
+
public static final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
private static final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java?rev=605698&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java Wed Dec 19 13:09:40 2007
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.thread;
+
+import java.util.TimerTask;
+
+/**
+ * A TimeTask for a Runnable object
+ *
+ */
+public class SchedulerTimerTask extends TimerTask {
+ private final Runnable task;
+
+ public SchedulerTimerTask(Runnable task) {
+ this.task = task;
+ }
+
+ public void run() {
+ this.task.run();
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SchedulerTimerTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=605698&r1=605697&r2=605698&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Wed Dec 19 13:09:40 2007
@@ -17,11 +17,17 @@
package org.apache.activemq.transport;
import java.io.IOException;
+import java.util.Timer;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +40,9 @@
public class InactivityMonitor extends TransportFilter {
private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
-
+ private static final ThreadPoolExecutor ASYNC_TASKS;
+ private static final Timer READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck");
+ private static final Timer WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck");
private WireFormatInfo localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo;
private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
@@ -44,6 +52,8 @@
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false);
+ private SchedulerTimerTask writeCheckerTask;
+ private SchedulerTimerTask readCheckerTask;
private final Runnable readChecker = new Runnable() {
long lastRunTime;
@@ -51,6 +61,7 @@
long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check.");
+
}
lastRunTime = now;
readCheck();
@@ -62,7 +73,8 @@
public void run() {
long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
- LOG.debug(""+(now-lastRunTime)+" ms elapsed since last read check.");
+ LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
+
}
lastRunTime = now;
writeCheck();
@@ -80,14 +92,17 @@
final void writeCheck() {
if (inSend.get()) {
- LOG.trace("A send is in progress");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("A send is in progress");
+ }
return;
}
if (!commandSent.get()) {
- LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
- // TODO: use a thread pool for this..
- Thread thread = new Thread("ActiveMQ: Activity Generator: "+next.getRemoteAddress()) {
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
+ }
+ ASYNC_TASKS.execute(new Runnable() {
public void run() {
try {
oneway(new KeepAliveInfo());
@@ -95,11 +110,11 @@
onException(e);
}
};
- };
- thread.setDaemon(true);
- thread.start();
+ });
} else {
- LOG.trace("Message sent since last write check, resetting flag");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message sent since last write check, resetting flag");
+ }
}
commandSent.set(false);
@@ -107,29 +122,34 @@
final void readCheck() {
if (inReceive.get()) {
- LOG.trace("A receive is in progress");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("A receive is in progress");
+ }
return;
}
-
if (!commandReceived.get()) {
- LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+ }
+
// TODO: use a thread pool for this..
- Thread thread = new Thread("ActiveMQ: Inactivity Handler: "+next.getRemoteAddress()) {
+ ASYNC_TASKS.execute(new Runnable() {
public void run() {
onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
};
- };
- thread.setDaemon(true);
- thread.start();
+ });
} else {
- LOG.trace("Message received since last read check, resetting flag: ");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message received since last read check, resetting flag: ");
+ }
}
commandReceived.set(false);
}
public void onCommand(Object command) {
+ commandReceived.set(true);
inReceive.set(true);
try {
if (command.getClass() == WireFormatInfo.class) {
@@ -150,7 +170,7 @@
transportListener.onCommand(command);
}
} finally {
- commandReceived.set(true);
+
inReceive.set(false);
}
}
@@ -192,11 +212,14 @@
return;
}
- long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
- if (l > 0) {
+ long checkTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+ if (checkTime > 0) {
monitorStarted.set(true);
- Scheduler.executePeriodically(writeChecker, l / 2);
- Scheduler.executePeriodically(readChecker, l);
+ writeCheckerTask = new SchedulerTimerTask(writeChecker);
+ readCheckerTask = new SchedulerTimerTask(readChecker);
+ long writeCheckTime = checkTime/3;
+ WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, writeCheckTime,writeCheckTime);
+ READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, checkTime,checkTime);
}
}
@@ -205,9 +228,22 @@
*/
private synchronized void stopMonitorThreads() {
if (monitorStarted.compareAndSet(true, false)) {
- Scheduler.cancel(readChecker);
- Scheduler.cancel(writeChecker);
- }
+ readCheckerTask.cancel();
+ writeCheckerTask.cancel();
+ WRITE_CHECK_TIMER.purge();
+ READ_CHECK_TIMER.purge();
+ }
+ }
+
+
+ static {
+ ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=605698&r1=605697&r2=605698&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java Wed Dec 19 13:09:40 2007
@@ -47,7 +47,7 @@
protected int samepleCount = 20;
protected long sampleInternal = 10000;
protected int numberOfConsumers = 1;
- protected int numberofProducers = 2;
+ protected int numberofProducers = 0;
protected int playloadSize = 1024;
protected byte[] array;
protected ConnectionFactory factory;
@@ -164,8 +164,12 @@
totalRate += rate.getRate();
totalCount += rate.getTotalCount();
}
- int avgRate = totalRate / producers.length;
- System.out.println("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount);
+ if (producers != null && producers.length > 0) {
+ int avgRate = totalRate / producers.length;
+ System.out.println("Avg producer rate = " + avgRate
+ + " msg/sec | Total rate = " + totalRate + ", sent = "
+ + totalCount);
+ }
}
protected void dumpConsumerRate() {