You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/05/20 14:02:11 UTC
svn commit: r946600 [2/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Thu May 20 12:02:10 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.store.amq;
import java.io.File;
-
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.store.PersistenceAdapter;
@@ -35,7 +34,6 @@ import org.apache.activemq.util.IOHelper
*/
public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024;
- private TaskRunnerFactory taskRunnerFactory;
private File dataDirectory;
private int journalThreadPriority = Thread.MAX_PRIORITY;
private String brokerName = "localhost";
@@ -56,6 +54,7 @@ public class AMQPersistenceAdapterFactor
private boolean forceRecoverReferenceStore=false;
private long checkpointInterval = 1000 * 20;
private boolean useDedicatedTaskRunner;
+ private TaskRunnerFactory taskRunnerFactory;
/**
@@ -82,6 +81,8 @@ public class AMQPersistenceAdapterFactor
result.setMaxReferenceFileLength(getMaxReferenceFileLength());
result.setForceRecoverReferenceStore(isForceRecoverReferenceStore());
result.setRecoverReferenceStore(isRecoverReferenceStore());
+ result.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
+ result.setJournalThreadPriority(getJournalThreadPriority());
return result;
}
@@ -122,10 +123,6 @@ public class AMQPersistenceAdapterFactor
* @return the taskRunnerFactory
*/
public TaskRunnerFactory getTaskRunnerFactory() {
- if (taskRunnerFactory == null) {
- taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority,
- true, 1000, isUseDedicatedTaskRunner());
- }
return taskRunnerFactory;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu May 20 12:02:10 2010
@@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactor
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
@@ -64,9 +63,9 @@ import org.apache.activemq.thread.Schedu
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
-import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
@@ -85,7 +84,7 @@ public class JournalPersistenceAdapter i
private BrokerService brokerService;
- protected static final Scheduler scheduler = Scheduler.getInstance();
+ protected Scheduler scheduler;
private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
private Journal journal;
@@ -97,20 +96,20 @@ public class JournalPersistenceAdapter i
private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
private SystemUsage usageManager;
- private long checkpointInterval = 1000 * 60 * 5;
+ private final long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 1024 * 1024;
- private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
+ private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private boolean fullCheckPoint;
- private AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean started = new AtomicBoolean(false);
private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
@@ -267,7 +266,9 @@ public class JournalPersistenceAdapter i
recover();
// Do a checkpoint periodically.
- scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
+ this.scheduler = new Scheduler("Journal Scheduler");
+ this.scheduler.start();
+ this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
}
@@ -278,7 +279,8 @@ public class JournalPersistenceAdapter i
return;
}
- scheduler.cancel(periodicCheckpointTask);
+ this.scheduler.cancel(periodicCheckpointTask);
+ this.scheduler.stop();
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true);
@@ -723,6 +725,7 @@ public class JournalPersistenceAdapter i
longTermPersistence.setBrokerName(brokerName);
}
+ @Override
public String toString() {
return "JournalPersistenceAdapator(" + longTermPersistence + ")";
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Thu May 20 12:02:10 2010
@@ -415,6 +415,7 @@ public class KahaDBPersistenceAdapter im
@Override
public String toString() {
- return "KahaDBPersistenceAdapter";
+ String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
+ return "KahaDBPersistenceAdapter[" + path +"]" ;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu May 20 12:02:10 2010
@@ -72,6 +72,7 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
+import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
@@ -94,6 +95,7 @@ public class KahaDBStore extends Message
private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
+ private Scheduler scheduler;
public KahaDBStore() {
@@ -155,6 +157,7 @@ public class KahaDBStore extends Message
@Override
public void doStart() throws Exception {
+ super.doStart();
this.queueSemaphore = new Semaphore(getMaxAsyncJobs());
this.topicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
@@ -175,8 +178,6 @@ public class KahaDBStore extends Message
return thread;
}
});
- super.doStart();
-
}
@Override
@@ -204,6 +205,7 @@ public class KahaDBStore extends Message
protected void addQueueTask(StoreQueueTask task) throws IOException {
try {
this.queueSemaphore.acquire();
+
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
@@ -327,7 +329,6 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu May 20 12:02:10 2010
@@ -82,7 +82,7 @@ import org.apache.kahadb.util.VariableMa
public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
- private BrokerService brokerService;
+ protected BrokerService brokerService;
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@@ -245,7 +245,6 @@ public class MessageDatabase extends Ser
// to see if we need to exit this thread.
long sleepTime = Math.min(checkpointInterval, 500);
while (opened.get()) {
-
Thread.sleep(sleepTime);
long now = System.currentTimeMillis();
if( now - lastCleanup >= cleanupInterval ) {
@@ -276,9 +275,7 @@ public class MessageDatabase extends Ser
public void open() throws IOException {
if( opened.compareAndSet(false, true) ) {
getJournal().start();
-
- loadPageFile();
-
+ loadPageFile();
startCheckpoint();
recover();
}
@@ -332,6 +329,11 @@ public class MessageDatabase extends Ser
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, true);
+ }
+ });
pageFile.unload();
metadata = new Metadata();
}
@@ -385,11 +387,12 @@ public class MessageDatabase extends Ser
*/
private void recover() throws IllegalStateException, IOException {
synchronized (indexMutex) {
- long start = System.currentTimeMillis();
-
+
+ long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition();
if( recoveryPosition!=null ) {
int redoCounter = 0;
+ LOG.info("Recoverying from the journal ...");
while (recoveryPosition != null) {
JournalCommand message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition;
@@ -398,7 +401,7 @@ public class MessageDatabase extends Ser
recoveryPosition = journal.getNextLocation(recoveryPosition);
}
long end = System.currentTimeMillis();
- LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+ LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
}
// We may have to undo some index updates.
@@ -693,7 +696,7 @@ public class MessageDatabase extends Ser
// from the recovery method too so they need to be idempotent
// /////////////////////////////////////////////////////////////////
- private void process(JournalCommand data, final Location location) throws IOException {
+ void process(JournalCommand data, final Location location) throws IOException {
data.visit(new Visitor() {
@Override
public void visit(KahaAddMessageCommand command) throws IOException {
@@ -732,11 +735,12 @@ public class MessageDatabase extends Ser
});
}
- private void process(final KahaAddMessageCommand command, final Location location) throws IOException {
+ protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
synchronized (indexMutex) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new AddOpperation(command, location));
+ TransactionId key = key(command.getTransactionInfo());
}
} else {
synchronized (indexMutex) {
@@ -836,7 +840,7 @@ public class MessageDatabase extends Ser
protected final Object indexMutex = new Object();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
- private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
+ void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
// Skip adding the message to the index if this is a topic and there are
@@ -870,7 +874,7 @@ public class MessageDatabase extends Ser
}
- private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
+ void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
if (!command.hasSubscriptionKey()) {
@@ -902,7 +906,7 @@ public class MessageDatabase extends Ser
}
}
- private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
+ void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
sd.orderIndex.clear(tx);
sd.orderIndex.unload(tx);
@@ -931,7 +935,7 @@ public class MessageDatabase extends Ser
metadata.destinations.remove(tx, key);
}
- private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
+ void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
// If set then we are creating it.. otherwise we are destroying the sub
@@ -961,8 +965,7 @@ public class MessageDatabase extends Ser
* @param tx
* @throws IOException
*/
- private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
-
+ void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
LOG.debug("Checkpoint started.");
metadata.state = OPEN_STATE;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Thu May 20 12:02:10 2010
@@ -19,32 +19,25 @@ package org.apache.activemq.thread;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
/**
- * Singelton, references maintained by users
* @version $Revision$
*/
-public final class Scheduler {
-
- private final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
- private final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();
- private static Scheduler instance;
-
- static {
- instance = new Scheduler();
- }
+public final class Scheduler extends ServiceSupport {
+ private final String name;
+ private Timer timer;
+ private final HashMap<Runnable, TimerTask> timerTasks = new HashMap<Runnable, TimerTask>();
- private Scheduler() {
- }
-
- public static Scheduler getInstance() {
- return instance;
+ public Scheduler (String name) {
+ this.name = name;
}
-
- public synchronized void executePeriodically(final Runnable task, long period) {
+
+ public void executePeriodically(final Runnable task, long period) {
TimerTask timerTask = new SchedulerTimerTask(task);
- CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period);
- TIMER_TASKS.put(task, timerTask);
+ timer.scheduleAtFixedRate(timerTask, period, period);
+ timerTasks.put(task, timerTask);
}
/*
@@ -53,24 +46,38 @@ public final class Scheduler {
*/
public synchronized void schedualPeriodically(final Runnable task, long period) {
TimerTask timerTask = new SchedulerTimerTask(task);
- CLOCK_DAEMON.schedule(timerTask, period, period);
- TIMER_TASKS.put(task, timerTask);
+ timer.schedule(timerTask, period, period);
+ timerTasks.put(task, timerTask);
}
public synchronized void cancel(Runnable task) {
- TimerTask ticket = TIMER_TASKS.remove(task);
+ TimerTask ticket = timerTasks.remove(task);
if (ticket != null) {
ticket.cancel();
- CLOCK_DAEMON.purge();//remove cancelled TimerTasks
+ timer.purge();//remove cancelled TimerTasks
}
}
- public void executeAfterDelay(final Runnable task, long redeliveryDelay) {
+ public synchronized void executeAfterDelay(final Runnable task, long redeliveryDelay) {
TimerTask timerTask = new SchedulerTimerTask(task);
- CLOCK_DAEMON.schedule(timerTask, redeliveryDelay);
+ timer.schedule(timerTask, redeliveryDelay);
}
public void shutdown() {
- CLOCK_DAEMON.cancel();
+ timer.cancel();
+ }
+
+ @Override
+ protected synchronized void doStart() throws Exception {
+ this.timer = new Timer(name, true);
+
+ }
+
+ @Override
+ protected synchronized void doStop(ServiceStopper stopper) throws Exception {
+ if (this.timer != null) {
+ this.timer.cancel();
+ }
+
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu May 20 12:02:10 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.usage;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.Service;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStore;
@@ -36,6 +37,7 @@ public class SystemUsage implements Serv
private MemoryUsage memoryUsage;
private StoreUsage storeUsage;
private TempUsage tempUsage;
+ private ThreadPoolExecutor executor;
/**
* True if someone called setSendFailIfNoSpace() on this particular usage
@@ -45,7 +47,7 @@ public class SystemUsage implements Serv
private boolean sendFailIfNoSpace;
private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet;
private long sendFailIfNoSpaceAfterTimeout = 0;
-
+
private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
public SystemUsage() {
@@ -58,14 +60,21 @@ public class SystemUsage implements Serv
this.memoryUsage = new MemoryUsage(name + ":memory");
this.storeUsage = new StoreUsage(name + ":store", adapter);
this.tempUsage = new TempUsage(name + ":temp", tempStore);
+ this.memoryUsage.setExecutor(getExecutor());
+ this.storeUsage.setExecutor(getExecutor());
+ this.tempUsage.setExecutor(getExecutor());
}
public SystemUsage(SystemUsage parent, String name) {
this.parent = parent;
+ this.executor = parent.getExecutor();
this.name = name;
this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory");
this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store");
this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp");
+ this.memoryUsage.setExecutor(getExecutor());
+ this.storeUsage.setExecutor(getExecutor());
+ this.tempUsage.setExecutor(getExecutor());
}
public String getName() {
@@ -186,6 +195,7 @@ public class SystemUsage implements Serv
memoryUsage.setParent(parent.memoryUsage);
}
this.memoryUsage = memoryUsage;
+ this.memoryUsage.setExecutor(getExecutor());
}
public void setStoreUsage(StoreUsage storeUsage) {
@@ -199,6 +209,7 @@ public class SystemUsage implements Serv
storeUsage.setParent(parent.storeUsage);
}
this.storeUsage = storeUsage;
+ this.storeUsage.setExecutor(executor);
}
@@ -213,5 +224,30 @@ public class SystemUsage implements Serv
tempDiskUsage.setParent(parent.tempUsage);
}
this.tempUsage = tempDiskUsage;
+ this.tempUsage.setExecutor(getExecutor());
+ }
+
+ /**
+ * @return the executor
+ */
+ public ThreadPoolExecutor getExecutor() {
+ return this.executor;
+ }
+
+ /**
+ * @param executor
+ * the executor to set
+ */
+ public void setExecutor(ThreadPoolExecutor executor) {
+ this.executor = executor;
+ if (this.memoryUsage != null) {
+ this.memoryUsage.setExecutor(this.executor);
+ }
+ if (this.storeUsage != null) {
+ this.storeUsage.setExecutor(this.executor);
+ }
+ if (this.tempUsage != null) {
+ this.tempUsage.setExecutor(this.executor);
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu May 20 12:02:10 2010
@@ -21,13 +21,8 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,7 +38,6 @@ import org.apache.commons.logging.LogFac
public abstract class Usage<T extends Usage> implements Service {
private static final Log LOG = LogFactory.getLog(Usage.class);
- private static ThreadPoolExecutor executor;
protected final Object usageMutex = new Object();
protected int percentUsage;
protected T parent;
@@ -53,12 +47,11 @@ public abstract class Usage<T extends Us
private final boolean debug = LOG.isDebugEnabled();
private String name;
private float usagePortion = 1.0f;
- private List<T> children = new CopyOnWriteArrayList<T>();
+ private final List<T> children = new CopyOnWriteArrayList<T>();
private final List<Runnable> callbacks = new LinkedList<Runnable>();
private int pollingTime = 100;
-
- private AtomicBoolean started=new AtomicBoolean();
-
+ private final AtomicBoolean started=new AtomicBoolean();
+ private ThreadPoolExecutor executor;
public Usage(T parent, String name, float portion) {
this.parent = parent;
this.usagePortion = portion;
@@ -289,6 +282,7 @@ public abstract class Usage<T extends Us
return name;
}
+ @Override
public String toString() {
return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
}
@@ -411,18 +405,10 @@ public abstract class Usage<T extends Us
this.parent = parent;
}
- protected Executor getExecutor() {
- return executor;
+ public void setExecutor (ThreadPoolExecutor executor) {
+ this.executor = executor;
}
-
- static {
- executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "Usage Async Task");
- thread.setDaemon(true);
- return thread;
- }
- });
+ public ThreadPoolExecutor getExecutor() {
+ return executor;
}
-
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java Thu May 20 12:02:10 2010
@@ -16,14 +16,23 @@
*/
package org.apache.activemq.broker;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
public class BrokerRestartTestSupport extends BrokerTestSupport {
private PersistenceAdapter persistenceAdapter;
+ @Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
+ File dir = broker.getBrokerDataDirectory();
+ if (dir != null) {
+ IOHelper.deleteChildren(dir);
+ }
//broker.setPersistent(false);
broker.setDeleteAllMessagesOnStartup(true);
persistenceAdapter = broker.getPersistenceAdapter();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Thu May 20 12:02:10 2010
@@ -17,13 +17,14 @@
package org.apache.activemq.bugs;
import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -41,9 +42,7 @@ import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-
import junit.framework.Test;
-
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
@@ -121,7 +120,7 @@ public class DurableConsumerTest extends
}
private class MessagePublisher implements Runnable{
- private boolean shouldPublish = true;
+ private final boolean shouldPublish = true;
public void run(){
TopicConnectionFactory topicConnectionFactory = null;
@@ -170,13 +169,14 @@ public class DurableConsumerTest extends
Thread publisherThread = new Thread(new MessagePublisher());
publisherThread.start();
-
+ final List<SimpleTopicSubscriber> list = new ArrayList<SimpleTopicSubscriber>();
for (int i = 0; i < 100; i++) {
final int id = i;
Thread thread = new Thread(new Runnable(){
public void run(){
- new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
+ SimpleTopicSubscriber s =new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
+ list.add(s);
}
});
thread.start();
@@ -189,6 +189,9 @@ public class DurableConsumerTest extends
configurePersistence(broker);
broker.start();
Thread.sleep(10000);
+ for (SimpleTopicSubscriber s:list) {
+ s.closeConnection();
+ }
assertEquals(0, exceptions.size());
}
@@ -358,6 +361,7 @@ public class DurableConsumerTest extends
}
+ @Override
protected void setUp() throws Exception{
if (broker == null) {
broker = createBroker(true);
@@ -366,6 +370,7 @@ public class DurableConsumerTest extends
super.setUp();
}
+ @Override
protected void tearDown() throws Exception{
super.tearDown();
if (broker != null) {
@@ -392,11 +397,13 @@ public class DurableConsumerTest extends
protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{
answer.setDeleteAllMessagesOnStartup(deleteStore);
KahaDBStore kaha = new KahaDBStore();
+ //kaha.setConcurrentStoreAndDispatchTopics(false);
File directory = new File("target/activemq-data/kahadb");
if (deleteStore) {
IOHelper.deleteChildren(directory);
}
kaha.setDirectory(directory);
+ //kaha.setMaxAsyncJobs(10);
answer.setPersistenceAdapter(kaha);
answer.addConnector(bindAddress);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java Thu May 20 12:02:10 2010
@@ -26,7 +26,6 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.thread.Scheduler;
import org.apache.log4j.Logger;
public class EmbeddedActiveMQ
@@ -39,6 +38,7 @@ public class EmbeddedActiveMQ
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
BrokerService brokerService = null;
+ Connection connection = null;
logger.info("Start...");
try
@@ -49,7 +49,7 @@ public class EmbeddedActiveMQ
logger.info("Broker '" + brokerService.getBrokerName() + "' is starting........");
brokerService.start();
ConnectionFactory fac = new ActiveMQConnectionFactory("vm://TestMQ");
- Connection connection = fac.createConnection();
+ connection = fac.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("TEST.QUEUE");
MessageProducer producer = session.createProducer(queue);
@@ -71,12 +71,9 @@ public class EmbeddedActiveMQ
try
{
br.close();
- Scheduler scheduler = Scheduler.getInstance();
- scheduler.shutdown();
logger.info("Broker '" + brokerService.getBrokerName() + "' is stopping........");
- brokerService.stop();
- Scheduler.getInstance().shutdown();
-
+ connection.close();
+ brokerService.stop();
sleep(8);
logger.info(ThreadExplorer.show("Active threads after stop:"));
@@ -90,7 +87,7 @@ public class EmbeddedActiveMQ
logger.info("Waiting for list theads is greater then 1 ...");
int numTh = ThreadExplorer.active();
- while (numTh > 1)
+ while (numTh > 2)
{
sleep(3);
numTh = ThreadExplorer.active();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java Thu May 20 12:02:10 2010
@@ -18,15 +18,16 @@ package org.apache.activemq.network;
import javax.jms.MessageProducer;
import javax.jms.TemporaryQueue;
-
import org.apache.activemq.broker.BrokerService;
public class DuplexNetworkTest extends SimpleNetworkTest {
+ @Override
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/duplexLocalBroker.xml";
}
+ @Override
protected BrokerService createRemoteBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("remoteBroker");
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Thu May 20 12:02:10 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.network;
import java.net.URI;
-
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -30,9 +29,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
-
import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
@@ -174,11 +171,13 @@ public class SimpleNetworkTest extends T
}
}
+ @Override
protected void setUp() throws Exception {
super.setUp();
doSetUp();
}
+ @Override
protected void tearDown() throws Exception {
localBroker.deleteAllMessages();
remoteBroker.deleteAllMessages();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java Thu May 20 12:02:10 2010
@@ -20,10 +20,11 @@ package org.apache.activemq.usage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -31,6 +32,7 @@ import org.junit.Test;
public class MemoryUsageTest {
MemoryUsage underTest;
+ ThreadPoolExecutor executor;
@Test
public final void testPercentUsageNeedsNoThread() {
@@ -46,6 +48,7 @@ public class MemoryUsageTest {
public final void testAddUsageListenerStartsThread() throws Exception {
int activeThreadCount = Thread.activeCount();
underTest = new MemoryUsage();
+ underTest.setExecutor(executor);
underTest.setLimit(10);
underTest.start();
final CountDownLatch called = new CountDownLatch(1);
@@ -66,12 +69,24 @@ public class MemoryUsageTest {
@Before
public void setUp() throws Exception {
- underTest = new MemoryUsage();
+ underTest = new MemoryUsage();
+ this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "Usage Async Task");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ underTest.setExecutor(this.executor);
+
}
@After
public void tearDown() {
assertNotNull(underTest);
underTest.stop();
+ if (this.executor != null) {
+ this.executor.shutdownNow();
+ }
}
}