You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/02/13 22:56:51 UTC
svn commit: r1243706 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/vm/
test/java/org/apache/activemq/transport/vm/
Author: tabish
Date: Mon Feb 13 21:56:50 2012
New Revision: 1243706
URL: http://svn.apache.org/viewvc?rev=1243706&view=rev
Log:
Refactoed the VMTransport for: https://issues.apache.org/jira/browse/AMQ-3684
Adds a test for basic functionality tests on the VMTransport.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1243706&r1=1243705&r2=1243706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Mon Feb 13 21:56:50 2012
@@ -19,9 +19,11 @@ package org.apache.activemq.transport.vm
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
@@ -32,6 +34,8 @@ import org.apache.activemq.transport.Tra
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
/**
* A Transport implementation that uses direct method invocations.
*/
@@ -39,6 +43,8 @@ public class VMTransport implements Tran
private static final Object DISCONNECT = new Object();
private static final AtomicLong NEXT_ID = new AtomicLong(0);
+
+ // Transport Configuration
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean marshal;
@@ -47,19 +53,17 @@ public class VMTransport implements Tran
protected int asyncQueueDepth = 2000;
protected final URI location;
protected final long id;
- protected LinkedBlockingQueue<Object> messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
- protected final AtomicBoolean stopping = new AtomicBoolean();
+ // Implementation
+ private LinkedBlockingQueue<Object> messageQueue;
+ private TaskRunner taskRunner;
+
+ // Transport State
protected final AtomicBoolean started = new AtomicBoolean();
- protected final AtomicBoolean starting = new AtomicBoolean();
protected final AtomicBoolean disposed = new AtomicBoolean();
- // thread can be eager, so initialisation needs to be last so that partial state is not visible
- protected TaskRunner taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
-
private volatile int receiveCounter;
-
public VMTransport(URI location) {
this.location = location;
this.id = NEXT_ID.getAndIncrement();
@@ -70,106 +74,138 @@ public class VMTransport implements Tran
}
public void oneway(Object command) throws IOException {
+
if (disposed.get()) {
throw new TransportDisposedIOException("Transport disposed.");
}
+
if (peer == null) {
throw new IOException("Peer not connected.");
}
- TransportListener transportListener = null;
try {
- if (peer.disposed.get() || peer.stopping.get()) {
+
+ if (peer.disposed.get()) {
throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
}
- if (peer.started.get()) {
- if (peer.async) {
- peer.messageQueue.put(command);
- peer.wakeup();
- } else {
- transportListener = peer.transportListener;
- }
- } else {
- peer.messageQueue.put(command);
- synchronized (peer.starting) {
- if (peer.started.get() && !peer.messageQueue.isEmpty()) {
- // we missed the pending dispatch during start
- if (peer.async) {
- peer.wakeup();
- } else {
- transportListener = peer.transportListener;
- }
- }
- }
+ if (peer.async || !peer.started.get()) {
+ peer.getMessageQueue().put(command);
+ peer.wakeup();
+ return;
}
+
} catch (InterruptedException e) {
InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
iioe.initCause(e);
throw iioe;
}
- dispatch(peer, transportListener, command);
- }
- public void dispatch(VMTransport transport, TransportListener transportListener, Object command) {
- if (transportListener != null) {
- if (command == DISCONNECT) {
- transportListener.onException(
- new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
- } else {
- transport.receiveCounter++;
- transportListener.onCommand(command);
- }
- }
+ dispatch(peer, peer.messageQueue, command);
}
- public void start() throws Exception {
+ public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
+ TransportListener transportListener = transport.getTransportListener();
+ if (transportListener != null) {
+ synchronized (started) {
- if (starting.compareAndSet(false, true)) {
+ // Ensure that no additional commands entered the queue in the small time window
+ // before the start method locks the dispatch lock and the oneway method was in
+ // an put operation.
+ while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
+ doDispatch(transport, transportListener, pending.poll());
+ }
- if (transportListener == null) {
- throw new IOException("TransportListener not set.");
+ // We are now in sync mode and won't enqueue any more commands to the target
+ // transport so lets clean up its resources.
+ transport.messageQueue = null;
+
+ // Don't dispatch if either end was disposed already.
+ if (command != null && !this.disposed.get() && !transport.isDisposed()) {
+ doDispatch(transport, transportListener, command);
+ }
}
+ }
+ }
- // ensure there is no missed dispatch during start, sync with oneway
- synchronized (peer.starting) {
- Object command;
- while ((command = messageQueue.poll()) != null && !stopping.get()) {
- dispatch(this, transportListener, command);
- }
+ public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
+ if (command == DISCONNECT) {
+ transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
+ } else {
+ transport.receiveCounter++;
+ transportListener.onCommand(command);
+ }
+ }
- if (!disposed.get()) {
+ public void start() throws Exception {
- started.set(true);
+ if (transportListener == null) {
+ throw new IOException("TransportListener not set.");
+ }
- if (async) {
- taskRunner.wakeup();
- } else {
- messageQueue.clear();
- messageQueue = null;
- taskRunner.shutdown();
- taskRunner = null;
+ // If we are not in async mode we lock the dispatch lock here and then start to
+ // prevent any sync dispatches from occurring until we dispatch the pending messages
+ // to maintain delivery order. When async this happens automatically so just set
+ // started and wakeup the task runner.
+ if (!async) {
+ synchronized (started) {
+ if (started.compareAndSet(false, true)) {
+ LinkedBlockingQueue<Object> mq = getMessageQueue();
+ Object command;
+ while ((command = mq.poll()) != null && !disposed.get() ) {
+ receiveCounter++;
+ doDispatch(this, transportListener, command);
}
}
}
+ } else {
+ if (started.compareAndSet(false, true)) {
+ wakeup();
+ }
}
}
public void stop() throws Exception {
+ // Only need to do this once, all future oneway calls will now
+ // fail as will any asnyc jobs in the task runner.
if (disposed.compareAndSet(false, true)) {
- stopping.set(true);
- // let the peer know that we are disconnecting..
+
+ TaskRunner tr = taskRunner;
+ LinkedBlockingQueue<Object> mq = this.messageQueue;
+
+ taskRunner = null;
+ messageQueue = null;
+
+ if (mq != null) {
+ mq.clear();
+ }
+
+ // Allow pending deliveries to finish up, but don't wait
+ // forever in case of an stalled onCommand.
+ if (tr != null) {
+ try {
+ tr.shutdown(TimeUnit.SECONDS.toMillis(1));
+ } catch(Exception e) {
+ }
+ }
+
+ // let the peer know that we are disconnecting after attempting
+ // to cleanly shutdown the async tasks so that this is the last
+ // command it see's.
try {
peer.transportListener.onCommand(new ShutdownInfo());
} catch (Exception ignore) {
}
+ }
+ }
- if (messageQueue != null) {
- messageQueue.clear();
- }
- if (taskRunner != null) {
- taskRunner.shutdown(1000);
- taskRunner = null;
+ protected void wakeup() {
+ if (async && started.get()) {
+ try {
+ getTaskRunner().wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (TransportDisposedIOException e) {
}
}
}
@@ -179,20 +215,27 @@ public class VMTransport implements Tran
*/
public boolean iterate() {
- if (disposed.get() || stopping.get()) {
+ final TransportListener tl = transportListener;
+
+ LinkedBlockingQueue<Object> mq;
+ try {
+ mq = getMessageQueue();
+ } catch (TransportDisposedIOException e) {
return false;
}
- LinkedBlockingQueue<Object> mq = messageQueue;
Object command = mq.poll();
- if (command != null) {
- if (command == DISCONNECT) {
- transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
+ if (command != null && !disposed.get()) {
+ if( command == DISCONNECT ) {
+ tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else {
- transportListener.onCommand(command);
+ tl.onCommand(command);
}
- return !mq.isEmpty();
+ return !mq.isEmpty() && !disposed.get();
} else {
+ if(disposed.get()) {
+ mq.clear();
+ }
return false;
}
}
@@ -201,6 +244,48 @@ public class VMTransport implements Tran
this.transportListener = commandListener;
}
+ public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
+ synchronized (this) {
+ if (messageQueue == null) {
+ messageQueue = asyncQueue;
+ }
+ }
+ }
+
+ public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
+ LinkedBlockingQueue<Object> result = messageQueue;
+ if (result == null) {
+ synchronized (this) {
+ result = messageQueue;
+ if (result == null) {
+ if (disposed.get()) {
+ throw new TransportDisposedIOException("The Transport has been disposed");
+ }
+
+ messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
+ }
+ }
+ }
+ return result;
+ }
+
+ protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
+ TaskRunner result = taskRunner;
+ if (result == null) {
+ synchronized (this) {
+ result = taskRunner;
+ if (result == null) {
+ if (disposed.get()) {
+ throw new TransportDisposedIOException("The Transport has been disposed");
+ }
+
+ taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
+ }
+ }
+ }
+ return result;
+ }
+
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
@@ -280,16 +365,6 @@ public class VMTransport implements Tran
this.asyncQueueDepth = asyncQueueDepth;
}
- protected void wakeup() {
- if (async) {
- try {
- taskRunner.wakeup();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
public boolean isFaultTolerant() {
return false;
}
@@ -299,11 +374,11 @@ public class VMTransport implements Tran
}
public boolean isConnected() {
- return started.get();
+ return !disposed.get();
}
public void reconnect(URI uri) throws IOException {
- throw new IOException("reconnection Not supported by this transport.");
+ throw new IOException("Transport reconnect is not supported");
}
public boolean isReconnectSupported() {
@@ -314,8 +389,8 @@ public class VMTransport implements Tran
return false;
}
- public void updateURIs(boolean reblance, URI[] uris) throws IOException {
- throw new IOException("Not supported");
+ public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+ throw new IOException("URI update feature not supported");
}
public int getReceiveCounter() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java?rev=1243706&r1=1243705&r2=1243706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java Mon Feb 13 21:56:50 2012
@@ -75,13 +75,12 @@ public class VMTransportServer implement
connectionCount.incrementAndGet();
VMTransport client = new VMTransport(location) {
public void stop() throws Exception {
- if (stopping.compareAndSet(false, true) && !disposed.get()) {
- super.stop();
- if (connectionCount.decrementAndGet() == 0
- && disposeOnDisconnect) {
- VMTransportServer.this.stop();
- }
- }
+ if (!disposed.get()) {
+ super.stop();
+ if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect) {
+ VMTransportServer.this.stop();
+ }
+ }
};
};
@@ -94,7 +93,7 @@ public class VMTransportServer implement
/**
* Configure transport
- *
+ *
* @param transport
* @return the Transport
*/
@@ -106,7 +105,7 @@ public class VMTransportServer implement
/**
* Set the Transport accept listener for new Connections
- *
+ *
* @param acceptListener
*/
public synchronized void setAcceptListener(TransportAcceptListener acceptListener) {
@@ -134,7 +133,7 @@ public class VMTransportServer implement
public InetSocketAddress getSocketAddress() {
return null;
}
-
+
public int getConnectionCount() {
return connectionCount.intValue();
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java?rev=1243706&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java Mon Feb 13 21:56:50 2012
@@ -0,0 +1,717 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.vm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.command.BaseCommand;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.transport.TransportDisposedIOException;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VMTransportThreadSafeTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class);
+
+ private final static String location1 = "vm://transport1";
+ private final static String location2 = "vm://transport2";
+
+ private final ConcurrentLinkedQueue<DummyCommand> localReceived = new ConcurrentLinkedQueue<DummyCommand>();
+ private final ConcurrentLinkedQueue<DummyCommand> remoteReceived = new ConcurrentLinkedQueue<DummyCommand>();
+
+ private class DummyCommand extends BaseCommand {
+
+ public final int sequenceId;
+
+ public DummyCommand() {
+ this.sequenceId = 0;
+ }
+
+ public DummyCommand(int id) {
+ this.sequenceId = id;
+ }
+
+ @Override
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return null;
+ }
+
+ @Override
+ public byte getDataStructureType() {
+ return 42;
+ }
+ }
+
+ private class VMTestTransportListener implements TransportListener {
+
+ protected final Queue<DummyCommand> received;
+
+ public boolean shutdownReceived = false;
+
+ public VMTestTransportListener(Queue<DummyCommand> receiveQueue) {
+ this.received = receiveQueue;
+ }
+
+ @Override
+ public void onCommand(Object command) {
+
+ if (command instanceof ShutdownInfo) {
+ shutdownReceived = true;
+ } else {
+ received.add((DummyCommand) command);
+ }
+ }
+
+ @Override
+ public void onException(IOException error) {
+ }
+
+ @Override
+ public void transportInterupted() {
+ }
+
+ @Override
+ public void transportResumed() {
+ }
+ }
+
+ private class SlowVMTestTransportListener extends VMTestTransportListener {
+
+ private final TimeUnit delayUnit;
+ private final long delay;
+
+ public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
+ this(receiveQueue, 10, TimeUnit.MILLISECONDS);
+ }
+
+ public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue, long delay, TimeUnit delayUnit) {
+ super(receiveQueue);
+
+ this.delay = delay;
+ this.delayUnit = delayUnit;
+ }
+
+ @Override
+ public void onCommand(Object command) {
+ super.onCommand(command);
+ try {
+ delayUnit.sleep(delay);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private class GatedVMTestTransportListener extends VMTestTransportListener {
+
+ private final CountDownLatch gate;
+
+ public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
+ this(receiveQueue, new CountDownLatch(1));
+ }
+
+ public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue, CountDownLatch gate) {
+ super(receiveQueue);
+
+ this.gate = gate;
+ }
+
+ @Override
+ public void onCommand(Object command) {
+ super.onCommand(command);
+ try {
+ gate.await();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private void assertMessageAreOrdered(ConcurrentLinkedQueue<DummyCommand> queue) {
+ int lastSequenceId = 0;
+ for(DummyCommand command : queue) {
+ int id = command.sequenceId;
+ assertTrue("Last id: " + lastSequenceId + " should be less than current id: " + id, id > lastSequenceId);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ localReceived.clear();
+ remoteReceived.clear();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test(timeout=60000)
+ public void testStartWthoutListenerIOE() throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ remote.setTransportListener(new VMTestTransportListener(localReceived));
+
+ try {
+ local.start();
+ fail("Should have thrown an IOExcoption");
+ } catch (IOException e) {
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testOnewayOnStoppedTransportTDE() throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new VMTestTransportListener(remoteReceived));
+
+ local.start();
+ local.stop();
+
+ try {
+ local.oneway(new DummyCommand());
+ fail("Should have thrown a TransportDisposedException");
+ } catch(TransportDisposedIOException e) {
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testStopSendsShutdownToPeer() throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(remoteListener);
+
+ local.start();
+ local.stop();
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remoteListener.shutdownReceived;
+ }
+ }));
+ }
+
+ @Test(timeout=60000)
+ public void testMultipleStartsAndStops() throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new VMTestTransportListener(remoteReceived));
+
+ local.start();
+ remote.start();
+
+ local.start();
+ remote.start();
+
+ for(int i = 0; i < 100; ++i) {
+ local.oneway(new DummyCommand());
+ }
+
+ for(int i = 0; i < 100; ++i) {
+ remote.oneway(new DummyCommand());
+ }
+
+ local.start();
+ remote.start();
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remoteReceived.size() == 100;
+ }
+ }));
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return localReceived.size() == 100;
+ }
+ }));
+
+ local.stop();
+ local.stop();
+ remote.stop();
+ remote.stop();
+ }
+
+ @Test(timeout=60000)
+ public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception {
+ doTestStartWithPeerNotStartedEnqueusCommands(false);
+ }
+
+ private void doTestStartWithPeerNotStartedEnqueusCommands(boolean async) throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ remote.setAsync(async);
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new VMTestTransportListener(remoteReceived));
+
+ local.start();
+
+ for(int i = 0; i < 100; ++i) {
+ local.oneway(new DummyCommand());
+ }
+
+ assertEquals(100, remote.getMessageQueue().size());
+
+ remote.start();
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remoteReceived.size() == 100;
+ }
+ }));
+
+ local.stop();
+ remote.stop();
+ }
+
+ @Test(timeout=60000)
+ public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception {
+ doTestBlockedOnewayEnqeueAandStopTransport(true);
+ }
+
+ @Test(timeout=60000)
+ public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception {
+ doTestBlockedOnewayEnqeueAandStopTransport(false);
+ }
+
+ private void doTestBlockedOnewayEnqeueAandStopTransport(boolean async) throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ final AtomicInteger sequenceId = new AtomicInteger();
+
+ remote.setAsync(async);
+ remote.setAsyncQueueDepth(99);
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new VMTestTransportListener(remoteReceived));
+
+ local.start();
+
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ for(int i = 0; i < 100; ++i) {
+ try {
+ local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
+ } catch (Exception e) {
+ }
+ }
+
+ }
+ });
+ t.start();
+
+ LOG.debug("Started async delivery, wait for remote's queue to fill up");
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remote.getMessageQueue().remainingCapacity() == 0;
+ }
+ }));
+
+ LOG.debug("Remote messageQ is full, start it and stop all");
+
+ remote.start();
+ local.stop();
+ remote.stop();
+ }
+
+ @Test(timeout=60000)
+ public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception {
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ final AtomicInteger sequenceId = new AtomicInteger();
+
+ remote.setAsync(true);
+ remote.setAsyncQueueDepth(2);
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new GatedVMTestTransportListener(remoteReceived));
+
+ local.start();
+ remote.start();
+
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ for(int i = 0; i < 3; ++i) {
+ try {
+ local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
+ } catch (Exception e) {
+ }
+ }
+
+ }
+ });
+ t.start();
+
+ LOG.debug("Started async delivery, wait for remote's queue to fill up");
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remote.getMessageQueue().remainingCapacity() == 0;
+ }
+ }));
+
+ LOG.debug("Starting async gate open.");
+ Thread gateman = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ ((GatedVMTestTransportListener) remote.getTransportListener()).gate.countDown();
+ }
+ });
+ gateman.start();
+
+ remote.stop();
+ local.stop();
+
+ assertEquals(1, remoteReceived.size());
+ assertMessageAreOrdered(remoteReceived);
+ }
+
+ @Test(timeout=60000)
+ public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
+ // In the async case the iterate method should see that we are stopping and
+ // drop out before we dispatch all the messages but it should get at least 49 since
+ // the stop thread waits 500 mills and the listener is waiting 10 mills on each receive.
+ doTestStopWhileStartingWithNoAsyncLimit(true, 49);
+ }
+
+ @Test(timeout=60000)
+ public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception {
+ // In the non-async case the start dispatches all messages up front and then continues on
+ doTestStopWhileStartingWithNoAsyncLimit(false, 100);
+ }
+
+ private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final int expect) throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ remote.setAsync(async);
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new SlowVMTestTransportListener(remoteReceived));
+
+ local.start();
+
+ for(int i = 0; i < 100; ++i) {
+ local.oneway(new DummyCommand(i));
+ }
+
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(500);
+ remote.stop();
+ } catch (Exception e) {
+ }
+ }
+ });
+
+ remote.start();
+
+ t.start();
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remoteReceived.size() >= expect;
+ }
+ }));
+
+ LOG.debug("Remote listener received " + remoteReceived.size() + " messages");
+
+ local.stop();
+
+ assertTrue("Remote transport never was disposed.", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remote.isDisposed();
+ }
+ }));
+ }
+
+ @Test(timeout=120000)
+ public void TestTwoWayMessageThroughPutSync() throws Exception {
+
+ long totalTimes = 0;
+ final long executions = 20;
+
+ for (int i = 0; i < 20; ++i) {
+ totalTimes += doTestTwoWayMessageThroughPut(false);
+ }
+
+ LOG.info("Total time of one way sync send throughput test: " + (totalTimes/executions) + "ms");
+ }
+
+ @Test(timeout=120000)
+ public void TestTwoWayMessageThroughPutAsnyc() throws Exception {
+
+ long totalTimes = 0;
+ final long executions = 50;
+
+ for (int i = 0; i < executions; ++i) {
+ totalTimes += doTestTwoWayMessageThroughPut(false);
+ }
+
+ LOG.info("Total time of one way async send throughput test: " + (totalTimes/executions) + "ms");
+ }
+
+ private long doTestTwoWayMessageThroughPut(boolean async) throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ final AtomicInteger sequenceId = new AtomicInteger();
+
+ remote.setAsync(async);
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new VMTestTransportListener(remoteReceived));
+
+ final int messageCount = 200000;
+
+ local.start();
+ remote.start();
+
+ long startTime = System.currentTimeMillis();
+
+ Thread localSend = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ for(int i = 0; i < messageCount; ++i) {
+ try {
+ local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
+ } catch (Exception e) {
+ }
+ }
+
+ }
+ });
+
+ Thread remoteSend = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ for(int i = 0; i < messageCount; ++i) {
+ try {
+ remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
+ } catch (Exception e) {
+ }
+ }
+
+ }
+ });
+
+ localSend.start();
+ remoteSend.start();
+
+ // Wait for both to finish and then check that each side go the correct amount
+ localSend.join();
+ remoteSend.join();
+
+ long endTime = System.currentTimeMillis();
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remoteReceived.size() == messageCount;
+ }
+ }));
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return localReceived.size() == messageCount;
+ }
+ }));
+
+ LOG.debug("All messages sent,stop all");
+
+ local.stop();
+ remote.stop();
+
+ localReceived.clear();
+ remoteReceived.clear();
+
+ return endTime - startTime;
+ }
+
+ @Test(timeout=120000)
+ public void TestOneWayMessageThroughPutSync() throws Exception {
+
+ long totalTimes = 0;
+ final long executions = 30;
+
+ for (int i = 0; i < executions; ++i) {
+ totalTimes += doTestOneWayMessageThroughPut(false);
+ }
+
+ LOG.info("Total time of one way sync send throughput test: " + (totalTimes/executions) + "ms");
+ }
+
+ @Test(timeout=120000)
+ public void TestOneWayMessageThroughPutAsnyc() throws Exception {
+
+ long totalTimes = 0;
+ final long executions = 20;
+
+ for (int i = 0; i < 20; ++i) {
+ totalTimes += doTestOneWayMessageThroughPut(true);
+ }
+
+ LOG.info("Total time of one way async send throughput test: " + (totalTimes/executions) + "ms");
+ }
+
+ private long doTestOneWayMessageThroughPut(boolean async) throws Exception {
+
+ final VMTransport local = new VMTransport(new URI(location1));
+ final VMTransport remote = new VMTransport(new URI(location2));
+
+ final AtomicInteger sequenceId = new AtomicInteger();
+
+ remote.setAsync(async);
+
+ local.setPeer(remote);
+ remote.setPeer(local);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new VMTestTransportListener(remoteReceived));
+
+ final int messageCount = 100000;
+
+ local.start();
+ remote.start();
+
+ long startTime = System.currentTimeMillis();
+
+ Thread localSend = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ for(int i = 0; i < messageCount; ++i) {
+ try {
+ local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
+ } catch (Exception e) {
+ }
+ }
+
+ }
+ });
+
+ localSend.start();
+
+ // Wait for both to finish and then check that each side go the correct amount
+ localSend.join();
+
+ long endTime = System.currentTimeMillis();
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remoteReceived.size() == messageCount;
+ }
+ }));
+
+ LOG.debug("All messages sent,stop all");
+
+ local.stop();
+ remote.stop();
+
+ localReceived.clear();
+ remoteReceived.clear();
+
+ return endTime - startTime;
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
------------------------------------------------------------------------------
svn:eol-style = native