You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/18 05:28:52 UTC
svn commit: r892128 [1/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/
activemq-dispatcher/src/main/java/org/apache/activemq/dis...
Author: chirino
Date: Fri Dec 18 04:28:49 2009
New Revision: 892128
URL: http://svn.apache.org/viewvc?rev=892128&view=rev
Log:
Better lifecycle handling.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java
- copied, changed from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java
- copied, changed from r891866, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Fri Dec 18 04:28:49 2009
@@ -138,7 +138,7 @@
throw new IllegalStateException("Can only start a broker that is in the "+State.CONFIGURATION +" state. Broker was "+state.get());
}
try {
- dispatcher.retain();
+ dispatcher.resume();
synchronized(virtualHosts) {
for (VirtualHost virtualHost : virtualHosts.values()) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Dec 18 04:28:49 2009
@@ -97,7 +97,7 @@
@Before
public void setUp() throws Exception {
dispatcher = createDispatcher();
- dispatcher.retain();
+ dispatcher.resume();
if (tcp) {
sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Fri Dec 18 04:28:49 2009
@@ -64,7 +64,7 @@
protected void startServices() throws Exception {
dispatcher = createDispatcher();
- dispatcher.retain();
+ dispatcher.resume();
database = new BrokerDatabase(createStore());
database.setDispatcher(dispatcher);
database.start();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java Fri Dec 18 04:28:49 2009
@@ -20,14 +20,11 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public interface DispatchObject extends Retained {
+public interface DispatchObject extends Suspendable {
public <Context> Context getContext();
public <Context> void setContext(Context context);
- public void suspend();
- public void resume();
-
public void setTargetQueue(DispatchQueue queue);
public DispatchQueue getTargetQueue();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Fri Dec 18 04:28:49 2009
@@ -30,7 +30,7 @@
private static Dispatcher create() {
Dispatcher rc = new DispatcherConfig().createDispatcher();
- rc.retain();
+ rc.resume();
return rc;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java Fri Dec 18 04:28:49 2009
@@ -23,7 +23,7 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public interface Dispatcher extends Retained {
+public interface Dispatcher extends Suspendable {
public DispatchQueue getGlobalQueue();
public DispatchQueue getGlobalQueue(DispatchPriority priority);
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java (from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java&r1=891451&r2=892128&rev=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java Fri Dec 18 04:28:49 2009
@@ -20,7 +20,7 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public interface Retained {
+public interface Retainable {
public void retain();
public void release();
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,6 @@
+package org.apache.activemq.dispatch;
+
+public interface Suspendable extends Retainable {
+ public void suspend();
+ public void resume();
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Fri Dec 18 04:28:49 2009
@@ -25,40 +25,30 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-abstract public class AbstractDispatchObject extends BaseRetained implements DispatchObject {
+abstract public class AbstractDispatchObject extends BaseSuspendable implements DispatchObject {
protected volatile Object context;
protected volatile DispatchQueue targetQueue;
- protected final AtomicInteger suspendCounter = new AtomicInteger();
@SuppressWarnings("unchecked")
public <Context> Context getContext() {
+ assertRetained();
return (Context) context;
}
public <Context> void setContext(Context context) {
+ assertRetained();
this.context = context;
}
public void setTargetQueue(DispatchQueue targetQueue) {
+ assertRetained();
this.targetQueue = targetQueue;
}
public DispatchQueue getTargetQueue() {
+ assertRetained();
return this.targetQueue;
}
-
- public void resume() {
- if( suspendCounter.decrementAndGet() == 0 ) {
- onResume();
- }
- }
-
- public void suspend() {
- suspendCounter.incrementAndGet();
- }
-
- protected void onResume() {
- }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java Fri Dec 18 04:28:49 2009
@@ -39,7 +39,7 @@
protected final AtomicInteger executeCounter = new AtomicInteger();
protected final AtomicLong externalQueueSize = new AtomicLong();
- protected final AtomicLong queueSize = new AtomicLong();
+ protected final AtomicLong size = new AtomicLong();
protected final ConcurrentLinkedQueue<Runnable> externalQueue = new ConcurrentLinkedQueue<Runnable>();
private final LinkedList<Runnable> localQueue = new LinkedList<Runnable>();
@@ -50,7 +50,6 @@
public AbstractSerialDispatchQueue(String label, DispatchOption...options) {
this.label = label;
this.options = set(options);
- retain();
}
static private Set<DispatchOption> set(DispatchOption[] options) {
@@ -64,28 +63,37 @@
}
@Override
+ protected void onStartup() {
+ dispatchSelfAsync();
+ }
+
+ @Override
protected void onResume() {
dispatchSelfAsync();
}
public void execute(Runnable command) {
+ assertRetained();
dispatchAsync(command);
}
public void dispatchAsync(Runnable runnable) {
assert runnable != null;
+ assertRetained();
- if( queueSize.getAndIncrement()==0 ) {
- retain();
- }
+ long sizeWas = size.getAndIncrement();
// We can take a shortcut...
if( executing.get()!=null ) {
localQueue.add(runnable);
} else {
+ if( sizeWas==0 ) {
+ retain();
+ }
+
long lastSize = externalQueueSize.getAndIncrement();
externalQueue.add(runnable);
- if( lastSize == 0 && suspendCounter.get()<=0 ) {
+ if( lastSize == 0 && suspended.get()<=0 ) {
dispatchSelfAsync();
}
}
@@ -122,7 +130,7 @@
try {
Runnable runnable;
- while( suspendCounter.get() <= 0 ) {
+ while( suspended.get() <= 0 ) {
if( (runnable = localQueue.poll())!=null ) {
counter++;
@@ -153,24 +161,30 @@
}
} finally {
- long size = queueSize.addAndGet(-counter);
- if( size==0 ) {
- release();
- } else {
- dispatchSelfAsync();
+ if( counter>0 ) {
+ long lsize = size.addAndGet(-counter);
+ assert lsize >= 0;
+ if( lsize==0 ) {
+ release();
+ } else {
+ dispatchSelfAsync();
+ }
}
}
}
public void dispatchSync(Runnable runnable) throws InterruptedException {
+ assertRetained();
dispatchApply(1, runnable);
}
public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+ assertRetained();
QueueSupport.dispatchApply(this, iterations, runnable);
}
public Set<DispatchOption> getOptions() {
+ assertRetained();
return options;
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java (from r891866, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java&r1=891866&r2=892128&rev=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java Fri Dec 18 04:28:49 2009
@@ -19,51 +19,50 @@
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
+import static java.lang.String.*;
+
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class BaseRetained {
-
- final protected AtomicInteger retainCounter = new AtomicInteger(0);
+public class BaseRetainable {
+
+ final protected AtomicInteger retained = new AtomicInteger(1);
final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>(1);
public void addShutdownWatcher(Runnable shutdownHandler) {
- synchronized(shutdownHandlers) {
+ assertRetained();
+ synchronized (shutdownHandlers) {
shutdownHandlers.add(shutdownHandler);
}
}
-
+
public void retain() {
- if( retainCounter.getAndIncrement() == 0 ) {
- startup();
- }
+ assertRetained();
+ retained.getAndIncrement();
}
public void release() {
- if( retainCounter.decrementAndGet()==0 ) {
- shutdown();
+ assertRetained();
+ if (retained.decrementAndGet() == 0) {
+ onShutdown();
}
}
-
- protected boolean isShutdown()
- {
- return retainCounter.get() <= 0;
- }
- /**
- * Subclasses should override if they want to do some startup processing.
- */
- protected void startup() {
+ final protected void assertRetained() {
+ assert retained.get() > 0 : format("%s: Use of object not allowed after it has been released", this.toString());
}
+ public boolean isShutdown() {
+ return retained.get() <= 0;
+ }
/**
- * Subclasses should override if they want to do clean up.
+ * Subclasses should override if they want to do clean up.
*/
- protected void shutdown() {
+ protected void onShutdown() {
ArrayList<Runnable> copy;
- synchronized(shutdownHandlers) {
+ synchronized (shutdownHandlers) {
copy = new ArrayList<Runnable>(shutdownHandlers);
shutdownHandlers.clear();
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.dispatch.Suspendable;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BaseSuspendable extends BaseRetainable implements Suspendable {
+
+ protected final AtomicBoolean startup = new AtomicBoolean(true);
+ protected final AtomicInteger suspended = new AtomicInteger();
+
+ public void resume() {
+ assertRetained();
+ if( suspended.decrementAndGet() == 0 ) {
+ if( startup.compareAndSet(true, false) ) {
+ onStartup();
+ } else {
+ onResume();
+ }
+ }
+ }
+
+ public void suspend() {
+ assertRetained();
+ if( suspended.getAndIncrement()==0 ) {
+ onSuspend();
+ }
+ }
+
+ protected void onStartup() {
+ }
+
+ protected void onSuspend() {
+ }
+
+ protected void onResume() {
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Fri Dec 18 04:28:49 2009
@@ -31,12 +31,12 @@
import org.apache.activemq.dispatch.DispatchSource;
import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatcherConfig;
-import org.apache.activemq.dispatch.internal.BaseRetained;
+import org.apache.activemq.dispatch.internal.BaseSuspendable;
import org.apache.activemq.dispatch.internal.nio.NIODispatchSource;
import static org.apache.activemq.dispatch.DispatchPriority.*;
-final public class AdvancedDispatcher extends BaseRetained implements Dispatcher {
+final public class AdvancedDispatcher extends BaseSuspendable implements Dispatcher {
public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
@@ -61,13 +61,11 @@
globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
}
loadBalancer = new SimpleLoadBalancer();
+ super.suspend();
}
-
- /**
- * @see org.apache.activemq.dispatch.internal.advanced.DispatcherThread#start()
- */
- protected void startup() {
- loadBalancer.start();
+
+ @Override
+ protected void onStartup() {
for (int i = 0; i < size; i++) {
DispatcherThread dispatacher;
try {
@@ -78,16 +76,28 @@
e.printStackTrace();
}
}
+
+ }
+
+ @Override
+ public void retain() {
+ super.retain();
+ }
+
+ @Override
+ public void suspend() {
+ throw new UnsupportedOperationException();
}
- protected void shutdown() {
+ @Override
+ protected void onShutdown() {
Runnable countDown = new Runnable() {
AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
public void run() {
if (shutdownCountDown.decrementAndGet() == 0) {
// Notify any registered shutdown watchers.
- AdvancedDispatcher.super.shutdown();
+ AdvancedDispatcher.super.onShutdown();
}
}
};
@@ -97,6 +107,7 @@
}
loadBalancer.stop();
}
+
/**
* A Dispatcher must call this to indicate that is has started it's dispatch
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java Fri Dec 18 04:28:49 2009
@@ -38,7 +38,6 @@
private Runnable cancelHandler;
public NIODispatchSource() {
- super.retain();
}
/**
@@ -172,7 +171,7 @@
}
@Override
- protected void shutdown() {
+ protected void onShutdown() {
// actual close can only happen on the owning dispatch thread:
if (key != null && key.isValid()) {
// This will make sure that the key is removed
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Fri Dec 18 04:28:49 2009
@@ -26,6 +26,7 @@
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.internal.QueueSupport;
+import org.apache.activemq.util.IntrospectionSupport;
/**
*
@@ -143,4 +144,9 @@
return null;
}
+ @Override
+ public String toString() {
+ return IntrospectionSupport.toString(this);
+ }
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Fri Dec 18 04:28:49 2009
@@ -23,6 +23,7 @@
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+import org.apache.activemq.util.IntrospectionSupport;
/**
*
@@ -31,60 +32,61 @@
public final class SerialDispatchQueue extends AbstractSerialDispatchQueue implements SimpleQueue {
private final SimpleDispatcher dispatcher;
- private volatile boolean stickToThreadOnNextDispatch;
- private volatile boolean stickToThreadOnNextDispatchRequest;
-
- SerialDispatchQueue(SimpleDispatcher dispatcher, String label, DispatchOption...options) {
+
+ private volatile boolean stickToThreadOnNextDispatch;
+
+ private volatile boolean stickToThreadOnNextDispatchRequest;
+
+ SerialDispatchQueue(SimpleDispatcher dispatcher, String label, DispatchOption... options) {
super(label, options);
this.dispatcher = dispatcher;
- if( getOptions().contains(DispatchOption.STICK_TO_DISPATCH_THREAD) ) {
- stickToThreadOnNextDispatch=true;
+ if (getOptions().contains(DispatchOption.STICK_TO_DISPATCH_THREAD)) {
+ stickToThreadOnNextDispatch = true;
}
}
@Override
public void setTargetQueue(DispatchQueue targetQueue) {
- GlobalDispatchQueue global = ((SimpleQueue)targetQueue).isGlobalDispatchQueue();
- if( getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) && global!=null ) {
- stickToThreadOnNextDispatchRequest=true;
+ assertRetained();
+ GlobalDispatchQueue global = ((SimpleQueue) targetQueue).isGlobalDispatchQueue();
+ if (getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) && global != null) {
+ stickToThreadOnNextDispatchRequest = true;
}
super.setTargetQueue(targetQueue);
}
-
+
@Override
public void dispatchAsync(Runnable runnable) {
-
- if( stickToThreadOnNextDispatchRequest ) {
+ assertRetained();
+
+ if (stickToThreadOnNextDispatchRequest) {
SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
- if( current!=null ) {
+ if (current != null) {
SimpleQueue parent;
- while( (parent = current.getTargetQueue()) !=null ) {
+ while ((parent = current.getTargetQueue()) != null) {
current = parent;
}
- if( current.isThreadDispatchQueue()==null ) {
- System.out.println("crap");
- }
super.setTargetQueue(current);
- stickToThreadOnNextDispatchRequest=false;
+ stickToThreadOnNextDispatchRequest = false;
}
}
super.dispatchAsync(runnable);
}
-
+
public void run() {
SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
SimpleDispatcher.CURRENT_QUEUE.set(this);
-
+
try {
- if( stickToThreadOnNextDispatch ) {
- stickToThreadOnNextDispatch=false;
+ if (stickToThreadOnNextDispatch) {
+ stickToThreadOnNextDispatch = false;
GlobalDispatchQueue global = current.isGlobalDispatchQueue();
- if( global!=null ) {
+ if (global != null) {
setTargetQueue(global.getTargetQueue());
}
}
-
+
DispatcherThread thread = DispatcherThread.currentDispatcherThread();
dispatch(thread.executionCounter);
} finally {
@@ -92,8 +94,9 @@
}
}
-
+
public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+ assertRetained();
dispatcher.timerThread.addRelative(runnable, this, delay, unit);
}
@@ -116,9 +119,14 @@
public ThreadDispatchQueue isThreadDispatchQueue() {
return null;
}
-
+
public SimpleQueue getTargetQueue() {
+ assertRetained();
return (SimpleQueue) targetQueue;
}
+ @Override
+ public String toString() {
+ return IntrospectionSupport.toString(this, "label", "size", "suspended", "retained");
+ }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java Fri Dec 18 04:28:49 2009
@@ -21,14 +21,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchOption;
import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatcherConfig;
-import org.apache.activemq.dispatch.DispatchOption;
-import org.apache.activemq.dispatch.internal.BaseRetained;
import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+import org.apache.activemq.dispatch.internal.BaseSuspendable;
import static org.apache.activemq.dispatch.DispatchPriority.*;
@@ -37,7 +37,7 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-final public class SimpleDispatcher extends BaseRetained implements Dispatcher {
+final public class SimpleDispatcher extends BaseSuspendable implements Dispatcher {
public final static ThreadLocal<SimpleQueue> CURRENT_QUEUE = new ThreadLocal<SimpleQueue>();
@@ -58,6 +58,7 @@
globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
}
dispatchers = new DispatcherThread[config.getThreads()];
+ super.suspend();
}
public DispatchQueue getMainQueue() {
@@ -107,8 +108,14 @@
}
}
}
+
+ @Override
+ public void suspend() {
+ throw new UnsupportedOperationException();
+ }
- protected void startup() {
+ @Override
+ protected void onStartup() {
for (int i = 0; i < dispatchers.length; i++) {
dispatchers[i] = new DispatcherThread(this, i);
dispatchers[i].start();
@@ -117,7 +124,8 @@
timerThread.start();
}
- public void shutdown() {
+ @Override
+ public void onShutdown() {
Runnable countDown = new Runnable() {
AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
@@ -125,7 +133,7 @@
public void run() {
if (shutdownCountDown.decrementAndGet() == 0) {
// Notify any registered shutdown watchers.
- SimpleDispatcher.super.shutdown();
+ SimpleDispatcher.super.onShutdown();
}
throw new DispatcherThread.Shutdown();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Fri Dec 18 04:28:49 2009
@@ -41,7 +41,7 @@
public void testActorInvocation() throws Exception {
Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
- advancedSystem.retain();
+ advancedSystem.resume();
DispatchQueue queue = advancedSystem.createSerialQueue("test");
TestObjectActor actor = ActorProxy.create(TestObjectActor.class, new TestObject(), queue);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec 18 04:28:49 2009
@@ -32,7 +32,7 @@
public static void main(String[] args) throws Exception {
Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
- advancedSystem.retain();
+ advancedSystem.resume();
benchmarkGlobal("advanced global queue", advancedSystem);
benchmarkSerial("advanced private serial queue", advancedSystem);
@@ -44,7 +44,7 @@
DispatcherConfig config = new DispatcherConfig();
config.setThreads(6);
Dispatcher simpleSystem = new SimpleDispatcher(config);
- simpleSystem.retain();
+ simpleSystem.resume();
benchmarkGlobal("simple global queue", simpleSystem);
benchmarkSerial("simple private serial queue", simpleSystem);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Fri Dec 18 04:28:49 2009
@@ -32,7 +32,7 @@
public static void main(String[] args) throws Exception {
AdvancedDispatcher dispatcher = new AdvancedDispatcher(new DispatcherConfig());
- dispatcher.retain();
+ dispatcher.resume();
// warm the JIT up..
benchmarkWork(dispatcher, 100000);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Dec 18 04:28:49 2009
@@ -92,7 +92,7 @@
protected void startServices() throws Exception {
dispatcher = createDispatcher();
- dispatcher.retain();
+ dispatcher.resume();
database = new BrokerDatabase(createStore());
database.setDispatcher(dispatcher);
if( TEST_MAX_STORE_LATENCY ) {
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.queue.actor.transport.TransportHandler;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class BaseConnection {
+
+ protected interface ConnectionStateActor extends TransportHandler {
+ void onStart();
+ void onStop();
+ }
+
+ protected String name;
+ protected Dispatcher dispatcher;
+
+ protected DispatchQueue dispatchQueue;
+ protected ConnectionStateActor actor;
+
+ public void start() {
+ dispatchQueue = dispatcher.createSerialQueue(name);
+ createActor();
+ actor.onStart();
+ }
+
+ public void stop() throws InterruptedException {
+ actor.onStop();
+ RunnableCountDownLatch done = new RunnableCountDownLatch(1);
+ dispatchQueue.addShutdownWatcher(done);
+ dispatchQueue.release();
+ done.await();
+ }
+
+ abstract protected void createActor();
+
+ // The actor pattern ensures that this object is only accessed in
+ // serial execution context. So synchronization is required.
+ // It also places a restriction that all operations should
+ // avoid mutex contention and avoid blocking IO calls.
+ protected class ConnectionState implements ConnectionStateActor {
+
+ final protected WindowController inboundSessionWindow = new WindowController();
+ final protected WindowLimiter outboundSessionWindow = new WindowLimiter();
+ final protected WindowLimiter outboundTransportWindow = new WindowLimiter();
+
+ protected Transport transport;
+ protected boolean disconnected;
+ protected Exception failure;
+
+ ConnectionState() {
+ outboundTransportWindow.size(100);
+ }
+
+ public void onStart() {
+ dispatchQueue.addShutdownWatcher(new Runnable() {
+ public void run() {
+ transport.setTargetQueue(dispatcher.getGlobalQueue());
+ transport.release();
+ }
+ });
+ transport.setTargetQueue(dispatchQueue);
+ transport.setHandler(this);
+ transport.resume();
+ }
+
+ public void onStop() {
+ }
+
+ public void onConnect() {
+ sendFlowControl(inboundSessionWindow.maxSize(1000));
+ }
+
+ public void onDisconnect() {
+ disconnected = true;
+ }
+
+ public void onFailure(Exception failure) {
+ failure.printStackTrace();
+ this.failure = failure;
+ }
+
+ public void onRecevie(Object command) {
+ if (command.getClass() == Message.class) {
+ // We should not be getting messages
+ // when the window is closed..
+ if( inboundSessionWindow.isClosed() ) {
+ onFailure(new Exception("Session overrun: " + command));
+ }
+ outboundSessionWindow.change(-1);
+ onReceiveMessage((Message) command);
+ } else if (command.getClass() == FlowControlBean.class || command.getClass() == FlowControlBuffer.class) {
+ onReceiveFlowControl((FlowControl) command);
+ } else if (command.getClass() == String.class) {
+ onReceiveString((String)command);
+ } else if (command.getClass() == DestinationBuffer.class || command.getClass() == DestinationBean.class) {
+ onReceiveDestination((Destination)command);
+ } else {
+ onFailure(new Exception("Unrecognized command: " + command));
+ }
+ }
+
+ public void sessionSend(Message message) {
+ transportSend(message);
+ }
+
+ protected void onReceiveDestination(Destination command) {
+ }
+
+ protected void onReceiveString(String command) {
+ }
+
+ protected void onReceiveMessage(Message msg) {
+ sendFlowControl(inboundSessionWindow.processed(1));
+ }
+
+ private void sendFlowControl(int credits) {
+ if( credits!=0 ) {
+ FlowControlBean fc = new FlowControlBean();
+ fc.setCredit(credits);
+ transportSend(fc);
+ }
+ }
+
+ public void transportSend(Object message) {
+ outboundTransportWindow.change(-1);
+ transport.send(message, onSendCompleted, dispatchQueue);
+ }
+
+ private final Runnable onSendCompleted = new Runnable() {
+ public void run() {
+ boolean wasClosed = outboundTransportWindow.isClosed();
+ outboundTransportWindow.change(1);
+ if( !wasClosed && !isSessionSendBlocked() ) {
+ onSessionResume();
+ }
+ }
+ };
+
+ protected void onReceiveFlowControl(FlowControl command) {
+ boolean wasClosed = outboundSessionWindow.isClosed();
+ outboundSessionWindow.change(command.getCredit());
+ if( wasClosed && !isSessionSendBlocked() ) {
+ onSessionResume();
+ }
+
+ }
+
+ protected boolean isSessionSendBlocked() {
+ return outboundTransportWindow.isClosed() || outboundSessionWindow.isClosed();
+ }
+
+ protected void onSessionResume() {
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java Fri Dec 18 04:28:49 2009
@@ -28,7 +28,7 @@
*/
public class BrokerConnection extends BaseConnection implements DeliveryTarget {
- interface BrokerProtocol extends Protocol {
+ interface BrokerConnectionStateActor extends ConnectionStateActor {
public void onBrokerDispatch(Message msg, Runnable r);
}
@@ -45,22 +45,22 @@
}
private MockBroker broker;
- private BrokerProtocol brokerActor;
+ private BrokerConnectionStateActor brokerActor;
private Transport transport;
private int priorityLevels;
protected void createActor() {
- actor = brokerActor = ActorProxy.create(BrokerProtocol.class, new BrokerProtocolImpl(), dispatchQueue);
+ actor = brokerActor = ActorProxy.create(BrokerConnectionStateActor.class, new BrokerConnectionState(), dispatchQueue);
}
- protected class BrokerProtocolImpl extends ProtocolImpl implements BrokerProtocol {
+ protected class BrokerConnectionState extends ConnectionState implements BrokerConnectionStateActor {
String name;
@Override
- public void start() {
+ public void onStart() {
this.transport = BrokerConnection.this.transport;
- super.start();
+ super.onStart();
}
// TODO: to increase fairness: we might want to have a pendingQueue per sender
@@ -81,7 +81,7 @@
// is configured with.
broker.router.route(msg, dispatchQueue, new Runnable() {
public void run() {
- BrokerProtocolImpl.super.onReceiveMessage(msg);
+ BrokerConnectionState.super.onReceiveMessage(msg);
}
});
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java Fri Dec 18 04:28:49 2009
@@ -32,15 +32,15 @@
}
protected void createActor() {
- actor = ActorProxy.create(Protocol.class, new ClientProtocolImpl(), dispatchQueue);
+ actor = ActorProxy.create(ConnectionStateActor.class, new ClientConnectionState(), dispatchQueue);
}
- protected class ClientProtocolImpl extends ProtocolImpl {
+ protected class ClientConnectionState extends ConnectionState {
@Override
- public void start() {
+ public void onStart() {
transport = TransportFactorySystem.connect(dispatcher, connectUri);
- super.start();
+ super.onStart();
}
public void onConnect() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java Fri Dec 18 04:28:49 2009
@@ -37,16 +37,16 @@
private final MetricCounter rate = new MetricCounter();
protected void createActor() {
- actor = ActorProxy.create(Protocol.class, new ProducerProtocolImpl(), dispatchQueue);
+ actor = ActorProxy.create(ConnectionStateActor.class, new ConsumerConnectionState(), dispatchQueue);
}
- class ProducerProtocolImpl extends ClientProtocolImpl {
+ class ConsumerConnectionState extends ClientConnectionState {
@Override
- public void start() {
+ public void onStart() {
rate.name("Consumer " + name + " Rate");
totalConsumerRate.add(rate);
- super.start();
+ super.onStart();
}
@Override
@@ -61,7 +61,7 @@
dispatchQueue.dispatchAfter(new Runnable() {
public void run() {
rate.increment();
- ProducerProtocolImpl.super.onReceiveMessage(msg);
+ ConsumerConnectionState.super.onReceiveMessage(msg);
}
}, thinkTime, TimeUnit.MILLISECONDS);
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.perf;
+
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatchObjectFilter implements DispatchObject {
+ protected DispatchObject next;
+
+ public DispatchObjectFilter() {
+ }
+
+ public DispatchObjectFilter(DispatchObject next) {
+ this.next = next;
+ }
+
+ public void addShutdownWatcher(Runnable shutdownWatcher) {
+ next.addShutdownWatcher(shutdownWatcher);
+ }
+ public <Context> Context getContext() {
+ return next.getContext();
+ }
+ public DispatchQueue getTargetQueue() {
+ return next.getTargetQueue();
+ }
+ public void release() {
+ next.release();
+ }
+ public void resume() {
+ next.resume();
+ }
+ public void retain() {
+ next.retain();
+ }
+ public <Context> void setContext(Context context) {
+ next.setContext(context);
+ }
+ public void setTargetQueue(DispatchQueue queue) {
+ next.setTargetQueue(queue);
+ }
+ public void suspend() {
+ next.suspend();
+ }
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java Fri Dec 18 04:28:49 2009
@@ -18,6 +18,8 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.dispatch.DispatchQueue;
@@ -29,6 +31,8 @@
import org.apache.activemq.queue.actor.transport.TransportServer;
import org.apache.activemq.queue.actor.transport.TransportServerHandler;
+import static java.util.concurrent.TimeUnit.*;
+
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -80,16 +84,18 @@
transportServer.release();
for (BrokerConnection connection : connections) {
- connection.release();
+ connection.stop();
}
for (BrokerConnection connection : brokerConnections) {
- connection.release();
+ connection.stop();
}
for (MockQueue queue : queues.values()) {
queue.stop();
}
}
+ CountDownLatch bindLatch = new CountDownLatch(1);
+
public final void startServices() throws Exception {
brokerDispatchQueue = dispatcher.createSerialQueue("broker");
transportServer = TransportFactorySystem.bind(dispatcher, uri);
@@ -100,12 +106,24 @@
for (MockQueue queue : queues.values()) {
queue.start();
}
+
+ for (BrokerConnection connection : brokerConnections) {
+ connection.start();
+ }
+
+ if( !bindLatch.await(5, SECONDS) ) {
+ throw new Exception("bind timeout");
+ }
+ }
+
+ public void onFailure(Exception error) {
+ bindLatch.countDown();
+ System.out.println("Accept error: " + error);
+ error.printStackTrace();
}
public void onBind() {
- for (BrokerConnection connection : brokerConnections) {
- connection.retain();
- }
+ bindLatch.countDown();
}
public void onUnbind() {
@@ -117,18 +135,15 @@
connection.setTransport(transport);
connection.setPriorityLevels(MockBrokerTest.PRIORITY_LEVELS);
connection.setDispatcher(dispatcher);
+ connection.setName("handler: "+transport.getRemoteAddress());
connections.add(connection);
try {
- connection.retain();
+ connection.start();
} catch (Exception e1) {
onFailure(e1);
}
}
- public void onFailure(Exception error) {
- System.out.println("Accept error: " + error);
- error.printStackTrace();
- }
public Dispatcher getDispatcher() {
return dispatcher;
@@ -181,7 +196,7 @@
broker.setName("Broker");
broker.createDispatcher();
try {
- broker.getDispatcher().retain();
+ broker.getDispatcher().resume();
broker.startServices();
} catch (Exception e) {
// TODO Auto-generated catch block
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java Fri Dec 18 04:28:49 2009
@@ -18,8 +18,6 @@
import java.util.ArrayList;
-import junit.framework.TestCase;
-
import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.flow.Commands.Destination;
@@ -27,12 +25,16 @@
import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class MockBrokerTest extends TestCase {
+public class MockBrokerTest {
protected static final int PERFORMANCE_SAMPLES = 3;
protected static final int SAMPLING_FREQUENCY = 5;
@@ -58,8 +60,7 @@
protected String receiveBrokerURI;
// Set's the number of threads to use:
- protected static final boolean SEPARATE_CLIENT_DISPATCHER = false;
- protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
+ static protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
protected boolean usePartitionedQueue = false;
protected ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
@@ -67,7 +68,7 @@
protected MockBroker rcvBroker;
protected MockClient client;
- protected Dispatcher dispatcher;
+ protected static Dispatcher dispatcher;
static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
public Long map(Message element) {
@@ -82,8 +83,22 @@
}
};
- @Override
- protected void setUp() throws Exception {
+ @BeforeClass
+ static public void setUpSuite() throws Exception {
+ dispatcher = DispatcherConfig.create("test", threadsPerDispatcher);
+ dispatcher.resume();
+ }
+
+ @AfterClass
+ static public void tearDownSuite() throws Exception {
+ if( dispatcher!=null ) {
+ dispatcher.release();
+ }
+ }
+
+
+ @Before
+ public void setUp() throws Exception {
if (tcp) {
sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
@@ -98,10 +113,7 @@
}
}
- protected Dispatcher createDispatcher(String name) {
- return DispatcherConfig.create("test", threadsPerDispatcher);
- }
-
+ @Test
public void test_1_1_0() throws Exception {
client = new MockClient();
@@ -109,67 +121,73 @@
client.setDestCount(1);
client.setNumConsumers(0);
- createConnections(1);
+ createConnections("test_1_1_0", 1);
runTestCase();
}
+ @Test
public void test_1_1_1() throws Exception {
client = new MockClient();
client.setNumProducers(1);
client.setDestCount(1);
client.setNumConsumers(1);
- createConnections(1);
+ createConnections("test_1_1_1", 1);
runTestCase();
}
+ @Test
public void test_10_10_10() throws Exception {
client = new MockClient();
client.setNumProducers(FANIN_COUNT);
client.setDestCount(FANIN_COUNT);
client.setNumConsumers(FANOUT_COUNT);
- createConnections(FANIN_COUNT);
+ createConnections("test_10_10_10", FANIN_COUNT);
runTestCase();
}
+ @Test
public void test_10_1_10() throws Exception {
client = new MockClient();
client.setNumProducers(FANIN_COUNT);
client.setDestCount(1);
client.setNumConsumers(FANOUT_COUNT);
- createConnections(1);
+ createConnections("test_10_1_10", 1);
runTestCase();
}
+ @Test
public void test_10_1_1() throws Exception {
client = new MockClient();
client.setNumProducers(FANIN_COUNT);
client.setDestCount(1);
client.setNumConsumers(1);
- createConnections(1);
+ createConnections("test_1_1_1", 1);
runTestCase();
}
+ @Test
public void test_1_1_10() throws Exception {
client = new MockClient();
client.setNumProducers(1);
client.setDestCount(1);
client.setNumConsumers(FANOUT_COUNT);
- createConnections(1);
+ createConnections("test_1_1_10", 1);
runTestCase();
}
+ @Test
public void test_2_2_2() throws Exception {
client = new MockClient();
client.setNumProducers(2);
client.setDestCount(2);
client.setNumConsumers(2);
- createConnections(2);
+ createConnections("test_2_2_2", 2);
runTestCase();
}
@@ -180,25 +198,27 @@
*
* @throws Exception
*/
+ @Test
public void test_2_2_2_SlowConsumer() throws Exception {
client = new MockClient();
client.setNumProducers(2);
client.setDestCount(2);
client.setNumConsumers(2);
- createConnections(2);
+ createConnections("test_2_2_2_SlowConsumer", 2);
client.consumer(0).setThinkTime(50);
runTestCase();
}
+ @Test
public void test_2_2_2_Selector() throws Exception {
client = new MockClient();
client.setNumProducers(2);
client.setDestCount(2);
client.setNumConsumers(2);
- createConnections(2);
+ createConnections("test_2_2_2_Selector", 2);
// Add properties to match producers to their consumers
for (int i = 0; i < 2; i++) {
@@ -216,6 +236,7 @@
*
* @throws Exception
*/
+ @Test
public void test_2_1_1_HighPriorityProducer() throws Exception {
client = new MockClient();
@@ -223,7 +244,7 @@
client.setNumConsumers(1);
client.setDestCount(1);
- createConnections(1);
+ createConnections("test_2_1_1_HighPriorityProducer", 1);
ProducerConnection producer = client.producer(0);
client.includeInRateReport(producer);
producer.setPriority(1);
@@ -240,13 +261,14 @@
*
* @throws Exception
*/
+ @Test
public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
client = new MockClient();
client.setNumProducers(2);
client.setNumConsumers(1);
client.setDestCount(1);
- createConnections(1);
+ createConnections("test_2_1_1_MixedHighPriorityProducer", 1);
ProducerConnection producer = client.producer(0);
producer.setPriority(1);
producer.setPriorityMod(3);
@@ -256,10 +278,7 @@
runTestCase();
}
- private void createConnections(int destCount) throws Exception {
-
- dispatcher = createDispatcher("BrokerDispatcher");
- dispatcher.retain();
+ private void createConnections(String testName, int destCount) throws Exception {
if (multibroker) {
sendBroker = createBroker("SendBroker", sendBrokerURI);
@@ -288,16 +307,8 @@
}
}
- Dispatcher clientDispatcher = null;
- if (SEPARATE_CLIENT_DISPATCHER) {
- clientDispatcher = createDispatcher("ClientDispatcher");
- clientDispatcher.retain();
- } else {
- clientDispatcher = dispatcher;
- }
-
// Configure Client:
- client.setDispatcher(clientDispatcher);
+ client.setDispatcher(dispatcher);
client.setNumPriorities(PRIORITY_LEVELS);
client.setSendBrokerURI(sendBroker.getUri());
client.setReceiveBrokerURI(rcvBroker.getUri());
@@ -305,8 +316,7 @@
client.setSamplingFrequency(1000 * SAMPLING_FREQUENCY);
client.setThreadsPerDispatcher(threadsPerDispatcher);
client.setPtp(ptp);
- client.setTestName(getName());
-
+ client.setTestName(testName);
client.createConnections();
}
@@ -336,19 +346,15 @@
try {
client.runTest();
} finally {
+ System.out.println("Shutting down..");
stopServices();
}
}
private void stopServices() throws Exception {
-
for (MockBroker broker : brokers) {
broker.stopServices();
}
-
- if (dispatcher != null) {
- dispatcher.release();
- }
}
private void startServices() throws Exception {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java Fri Dec 18 04:28:49 2009
@@ -227,8 +227,6 @@
}
public void runTest() throws Exception {
- getDispatcher().retain();
-
// Start 'em up.
startServices();
try {
@@ -241,21 +239,21 @@
private void startServices() throws Exception {
// BaseTestConnection.setInShutdown(false, dispatcher);
for (ConsumerConnection connection : consumers) {
- connection.retain();
+ connection.start();
}
for (ProducerConnection connection : producers) {
- connection.retain();
+ connection.start();
}
}
private void stopServices() throws Exception {
// BaseTestConnection.setInShutdown(true, dispatcher);
for (ProducerConnection connection : producers) {
- connection.release();
+ connection.stop();
}
for (ConsumerConnection connection : consumers) {
- connection.release();
+ connection.stop();
}
}
@@ -311,7 +309,7 @@
System.out.println(IntrospectionSupport.toString(test));
try
{
- test.getDispatcher().retain();
+ test.getDispatcher().resume();
test.createConnections();
test.runTest();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java Fri Dec 18 04:28:49 2009
@@ -40,16 +40,17 @@
private AtomicLong messageIdGenerator;
protected void createActor() {
- actor = ActorProxy.create(Protocol.class, new ProducerProtocolImpl(), dispatchQueue);
+ actor = ActorProxy.create(ConnectionStateActor.class, new ProducerConnectionState(), dispatchQueue);
}
- class ProducerProtocolImpl extends ClientProtocolImpl {
+ class ProducerConnectionState extends ClientConnectionState {
private String filler;
private int payloadCounter;
+ private boolean stopped;
@Override
- public void start() {
+ public void onStart() {
rate.name("Producer " + name + " Rate");
totalProducerRate.add(rate);
@@ -60,7 +61,7 @@
}
filler = sb.toString();
}
- super.start();
+ super.onStart();
}
@Override
@@ -73,8 +74,14 @@
produceMessages();
}
+ @Override
+ public void onStop() {
+ stopped = true;
+ super.onStop();
+ }
+
private void produceMessages() {
- while( !isSessionSendBlocked() ) {
+ while( !isSessionSendBlocked() && !stopped ) {
int p = priority;
if (priorityMod > 0) {
p = payloadCounter % priorityMod == 0 ? 0 : p;
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.perf;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class WindowController extends WindowLimiter {
+
+ private int maxSize;
+ private int processed;
+ private int creditsAt;
+
+ public int processed(int count) {
+ int rc = 0;
+ processed += count;
+ if( processed >= creditsAt ) {
+ change(processed);
+ rc = processed;
+ processed = 0;
+ }
+ return rc;
+ }
+
+ int maxSize(int newMaxSize) {
+ int change = newMaxSize-maxSize;
+ this.maxSize=newMaxSize;
+ this.creditsAt = maxSize/2;
+ change(change);
+ return change;
+ }
+
+ int maxSize() {
+ return maxSize;
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.perf;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class WindowLimiter {
+
+ private int opensAt = 1;
+ private int size;
+ private boolean closed;
+
+ public WindowLimiter() {
+ this.closed = true;
+ }
+
+ int size() {
+ return size;
+ }
+
+ WindowLimiter size(int size) {
+ this.size = size;
+ return this;
+ }
+
+ public boolean isOpen() {
+ return !closed;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void change(int change) {
+ size += change;
+ if( change > 0 && closed && size >= opensAt) {
+ closed = false;
+ } else if( change < 0 && !closed && size <= 0) {
+ closed = true;
+ }
+ }
+
+}
\ No newline at end of file