You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:13 UTC
[04/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
deleted file mode 100644
index 2268048..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++ /dev/null
@@ -1,937 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.command.BaseCommand;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.state.CommandVisitor;
-import org.apache.activemq.transport.FutureResponse;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.ResponseCallback;
-import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportDisposedIOException;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VMTransportThreadSafeTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class);
-
- private final static String location1 = "vm://transport1";
- private final static String location2 = "vm://transport2";
-
- private final ConcurrentLinkedQueue<DummyCommand> localReceived = new ConcurrentLinkedQueue<>();
- private final ConcurrentLinkedQueue<DummyCommand> remoteReceived = new ConcurrentLinkedQueue<>();
-
- private class DummyCommand extends BaseCommand {
-
- public final int sequenceId;
-
- public DummyCommand() {
- this.sequenceId = 0;
- }
-
- public DummyCommand(int id) {
- this.sequenceId = id;
- }
-
- @Override
- public Response visit(CommandVisitor visitor) throws Exception {
- return null;
- }
-
- @Override
- public byte getDataStructureType() {
- return 42;
- }
- }
-
- private class VMTestTransportListener implements TransportListener {
-
- protected final Queue<DummyCommand> received;
-
- public boolean shutdownReceived = false;
-
- public VMTestTransportListener(Queue<DummyCommand> receiveQueue) {
- this.received = receiveQueue;
- }
-
- @Override
- public void onCommand(Object command) {
-
- if (command instanceof ShutdownInfo) {
- shutdownReceived = true;
- }
- else {
- received.add((DummyCommand) command);
- }
- }
-
- @Override
- public void onException(IOException error) {
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- }
-
- private class VMResponderTransportListener implements TransportListener {
-
- protected final Queue<DummyCommand> received;
-
- private final Transport peer;
-
- public VMResponderTransportListener(Queue<DummyCommand> receiveQueue, Transport peer) {
- this.received = receiveQueue;
- this.peer = peer;
- }
-
- @Override
- public void onCommand(Object command) {
-
- if (command instanceof ShutdownInfo) {
- return;
- }
- else {
- received.add((DummyCommand) command);
-
- if (peer != null) {
- try {
- peer.oneway(command);
- }
- catch (IOException e) {
- }
- }
- }
- }
-
- @Override
- public void onException(IOException error) {
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- }
-
- private class SlowVMTestTransportListener extends VMTestTransportListener {
-
- private final TimeUnit delayUnit;
- private final long delay;
-
- public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
- this(receiveQueue, 10, TimeUnit.MILLISECONDS);
- }
-
- public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue, long delay, TimeUnit delayUnit) {
- super(receiveQueue);
-
- this.delay = delay;
- this.delayUnit = delayUnit;
- }
-
- @Override
- public void onCommand(Object command) {
- super.onCommand(command);
- try {
- delayUnit.sleep(delay);
- }
- catch (InterruptedException e) {
- }
- }
- }
-
- private class GatedVMTestTransportListener extends VMTestTransportListener {
-
- private final CountDownLatch gate;
-
- public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
- this(receiveQueue, new CountDownLatch(1));
- }
-
- public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue, CountDownLatch gate) {
- super(receiveQueue);
-
- this.gate = gate;
- }
-
- @Override
- public void onCommand(Object command) {
- super.onCommand(command);
- try {
- gate.await();
- }
- catch (InterruptedException e) {
- }
- }
- }
-
- private void assertMessageAreOrdered(ConcurrentLinkedQueue<DummyCommand> queue) {
- int lastSequenceId = 0;
- for (DummyCommand command : queue) {
- int id = command.sequenceId;
- assertTrue("Last id: " + lastSequenceId + " should be less than current id: " + id, id > lastSequenceId);
- }
- }
-
- @Before
- public void setUp() throws Exception {
- localReceived.clear();
- remoteReceived.clear();
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test(timeout = 60000)
- public void testStartWthoutListenerIOE() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- remote.setTransportListener(new VMTestTransportListener(localReceived));
-
- try {
- local.start();
- fail("Should have thrown an IOExcoption");
- }
- catch (IOException e) {
- }
- }
-
- @Test(timeout = 60000)
- public void testOnewayOnStoppedTransportTDE() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- local.start();
- local.stop();
-
- try {
- local.oneway(new DummyCommand());
- fail("Should have thrown a TransportDisposedException");
- }
- catch (TransportDisposedIOException e) {
- }
- }
-
- @Test(timeout = 60000)
- public void testStopSendsShutdownToPeer() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(remoteListener);
-
- local.start();
- local.stop();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteListener.shutdownReceived;
- }
- }));
- }
-
- @Test(timeout = 60000)
- public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
- remote.setTransportListener(remoteListener);
- remote.start();
-
- final Response[] answer = new Response[1];
- ResponseCorrelator responseCorrelator = new ResponseCorrelator(local);
- responseCorrelator.setTransportListener(new VMTestTransportListener(localReceived));
- responseCorrelator.start();
- responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() {
- @Override
- public void onCompletion(FutureResponse resp) {
- try {
- answer[0] = resp.getResult();
- }
- catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
-
- // simulate broker stop
- remote.stop();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("answer: " + answer[0]);
- return answer[0] instanceof ExceptionResponse && ((ExceptionResponse) answer[0]).getException() instanceof TransportDisposedIOException;
- }
- }));
-
- local.stop();
- }
-
- @Test(timeout = 60000)
- public void testMultipleStartsAndStops() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- local.start();
- remote.start();
-
- local.start();
- remote.start();
-
- for (int i = 0; i < 100; ++i) {
- local.oneway(new DummyCommand());
- }
-
- for (int i = 0; i < 100; ++i) {
- remote.oneway(new DummyCommand());
- }
-
- local.start();
- remote.start();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == 100;
- }
- }));
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return localReceived.size() == 100;
- }
- }));
-
- local.stop();
- local.stop();
- remote.stop();
- remote.stop();
- }
-
- @Test(timeout = 60000)
- public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception {
- doTestStartWithPeerNotStartedEnqueusCommands(false);
- }
-
- private void doTestStartWithPeerNotStartedEnqueusCommands(boolean async) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- remote.setAsync(async);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- local.start();
-
- for (int i = 0; i < 100; ++i) {
- local.oneway(new DummyCommand());
- }
-
- assertEquals(100, remote.getMessageQueue().size());
-
- remote.start();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == 100;
- }
- }));
-
- local.stop();
- remote.stop();
- }
-
- @Test(timeout = 60000)
- public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception {
- doTestBlockedOnewayEnqeueAandStopTransport(true);
- }
-
- @Test(timeout = 60000)
- public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception {
- doTestBlockedOnewayEnqeueAandStopTransport(false);
- }
-
- private void doTestBlockedOnewayEnqeueAandStopTransport(boolean async) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- remote.setAsync(async);
- remote.setAsyncQueueDepth(99);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- local.start();
-
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < 100; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
- t.start();
-
- LOG.debug("Started async delivery, wait for remote's queue to fill up");
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remote.getMessageQueue().remainingCapacity() == 0;
- }
- }));
-
- LOG.debug("Remote messageQ is full, start it and stop all");
-
- remote.start();
- local.stop();
- remote.stop();
- }
-
- @Test(timeout = 60000)
- public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception {
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- remote.setAsync(true);
- remote.setAsyncQueueDepth(2);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new GatedVMTestTransportListener(remoteReceived));
-
- local.start();
- remote.start();
-
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < 3; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
- t.start();
-
- LOG.debug("Started async delivery, wait for remote's queue to fill up");
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remote.getMessageQueue().remainingCapacity() == 0;
- }
- }));
-
- LOG.debug("Starting async gate open.");
- Thread gateman = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- }
- ((GatedVMTestTransportListener) remote.getTransportListener()).gate.countDown();
- }
- });
- gateman.start();
-
- remote.stop();
- local.stop();
-
- assertEquals(1, remoteReceived.size());
- assertMessageAreOrdered(remoteReceived);
- }
-
- @Test(timeout = 60000)
- public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
- // In the async case the iterate method should see that we are stopping and
- // drop out before we dispatch all the messages but it should get at least 49 since
- // the stop thread waits 500 mills and the listener is waiting 10 mills on each receive.
- doTestStopWhileStartingWithNoAsyncLimit(true, 49);
- }
-
- @Test(timeout = 60000)
- public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception {
- // In the non-async case the start dispatches all messages up front and then continues on
- doTestStopWhileStartingWithNoAsyncLimit(false, 100);
- }
-
- private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final int expect) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- remote.setAsync(async);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new SlowVMTestTransportListener(remoteReceived));
-
- local.start();
-
- for (int i = 0; i < 100; ++i) {
- local.oneway(new DummyCommand(i));
- }
-
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- remote.stop();
- }
- catch (Exception e) {
- }
- }
- });
-
- remote.start();
-
- t.start();
-
- assertTrue("Remote should receive: " + expect + ", commands but got: " + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() >= expect;
- }
- }));
-
- LOG.debug("Remote listener received " + remoteReceived.size() + " messages");
-
- local.stop();
-
- assertTrue("Remote transport never was disposed.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remote.isDisposed();
- }
- }));
- }
-
- @Test(timeout = 120000)
- public void TestTwoWayMessageThroughPutSync() throws Exception {
-
- long totalTimes = 0;
- final long executions = 20;
-
- for (int i = 0; i < 20; ++i) {
- totalTimes += doTestTwoWayMessageThroughPut(false);
- }
-
- LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms");
- }
-
- @Test(timeout = 120000)
- public void TestTwoWayMessageThroughPutAsnyc() throws Exception {
-
- long totalTimes = 0;
- final long executions = 50;
-
- for (int i = 0; i < executions; ++i) {
- totalTimes += doTestTwoWayMessageThroughPut(false);
- }
-
- LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms");
- }
-
- private long doTestTwoWayMessageThroughPut(boolean async) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- remote.setAsync(async);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- final int messageCount = 200000;
-
- local.start();
- remote.start();
-
- long startTime = System.currentTimeMillis();
-
- Thread localSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
-
- Thread remoteSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
-
- localSend.start();
- remoteSend.start();
-
- // Wait for both to finish and then check that each side go the correct amount
- localSend.join();
- remoteSend.join();
-
- long endTime = System.currentTimeMillis();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == messageCount;
- }
- }));
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return localReceived.size() == messageCount;
- }
- }));
-
- LOG.debug("All messages sent,stop all");
-
- local.stop();
- remote.stop();
-
- localReceived.clear();
- remoteReceived.clear();
-
- return endTime - startTime;
- }
-
- @Test(timeout = 120000)
- public void TestOneWayMessageThroughPutSync() throws Exception {
-
- long totalTimes = 0;
- final long executions = 30;
-
- for (int i = 0; i < executions; ++i) {
- totalTimes += doTestOneWayMessageThroughPut(false);
- }
-
- LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms");
- }
-
- @Test(timeout = 120000)
- public void TestOneWayMessageThroughPutAsnyc() throws Exception {
-
- long totalTimes = 0;
- final long executions = 20;
-
- for (int i = 0; i < 20; ++i) {
- totalTimes += doTestOneWayMessageThroughPut(true);
- }
-
- LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms");
- }
-
- private long doTestOneWayMessageThroughPut(boolean async) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- remote.setAsync(async);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- final int messageCount = 100000;
-
- local.start();
- remote.start();
-
- long startTime = System.currentTimeMillis();
-
- Thread localSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
-
- localSend.start();
-
- // Wait for both to finish and then check that each side go the correct amount
- localSend.join();
-
- long endTime = System.currentTimeMillis();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == messageCount;
- }
- }));
-
- LOG.debug("All messages sent,stop all");
-
- local.stop();
- remote.stop();
-
- localReceived.clear();
- remoteReceived.clear();
-
- return endTime - startTime;
- }
-
- @Test(timeout = 120000)
- public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
-
- for (int i = 0; i < 20; ++i) {
- doTestTwoWayTrafficWithMutexTransport(false, false);
- }
- }
-
- @Test(timeout = 120000)
- public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
-
- for (int i = 0; i < 20; ++i) {
- doTestTwoWayTrafficWithMutexTransport(true, false);
- }
- }
-
- @Test(timeout = 120000)
- public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
-
- for (int i = 0; i < 20; ++i) {
- doTestTwoWayTrafficWithMutexTransport(false, true);
- }
- }
-
- @Test(timeout = 120000)
- public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
-
- for (int i = 0; i < 20; ++i) {
- doTestTwoWayTrafficWithMutexTransport(false, false);
- }
- }
-
- public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception {
-
- final VMTransport vmlocal = new VMTransport(new URI(location1));
- final VMTransport vmremote = new VMTransport(new URI(location2));
-
- final MutexTransport local = new MutexTransport(vmlocal);
- final MutexTransport remote = new MutexTransport(vmremote);
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- vmlocal.setAsync(localAsync);
- vmremote.setAsync(remoteAsync);
-
- vmlocal.setPeer(vmremote);
- vmremote.setPeer(vmlocal);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote));
-
- final int messageCount = 200000;
-
- Thread localSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
- }
- });
-
- Thread remoteSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
- }
- });
-
- localSend.start();
- remoteSend.start();
-
- Thread.sleep(10);
-
- local.start();
- remote.start();
-
- // Wait for both to finish and then check that each side go the correct amount
- localSend.join();
- remoteSend.join();
-
- assertTrue("Remote should have received (" + messageCount + ") but got ()" + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == messageCount;
- }
- }));
-
- assertTrue("Local should have received (" + messageCount * 2 + ") but got ()" + localReceived.size(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return localReceived.size() == messageCount * 2;
- }
- }));
-
- LOG.debug("All messages sent,stop all");
-
- local.stop();
- remote.stop();
-
- localReceived.clear();
- remoteReceived.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
deleted file mode 100644
index dd14d67..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VMTransportWaitForTest {
-
- static final Logger LOG = LoggerFactory.getLogger(VMTransportWaitForTest.class);
-
- private static final int WAIT_TIME = 20000;
- private static final int SHORT_WAIT_TIME = 5000;
-
- private static final String VM_BROKER_URI_NO_WAIT = "vm://localhost?broker.persistent=false&create=false";
-
- private static final String VM_BROKER_URI_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + WAIT_TIME;
-
- private static final String VM_BROKER_URI_SHORT_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + SHORT_WAIT_TIME;
-
- CountDownLatch started = new CountDownLatch(1);
- CountDownLatch gotConnection = new CountDownLatch(1);
-
- @After
- public void after() throws IOException {
- BrokerRegistry.getInstance().unbind("localhost");
- }
-
- @Test(timeout = 90000)
- public void testWaitFor() throws Exception {
- try {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT));
- cf.createConnection();
- fail("expect broker not exist exception");
- }
- catch (JMSException expectedOnNoBrokerAndNoCreate) {
- }
-
- // spawn a thread that will wait for an embedded broker to start via
- // vm://..
- Thread t = new Thread("ClientConnectionThread") {
- @Override
- public void run() {
- try {
- started.countDown();
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START));
- cf.createConnection();
- gotConnection.countDown();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("unexpected exception: " + e);
- }
- }
- };
- t.start();
- started.await(20, TimeUnit.SECONDS);
- Thread.yield();
- assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS));
-
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- broker.start();
- assertTrue("has got connection", gotConnection.await(5, TimeUnit.SECONDS));
- broker.stop();
- }
-
- @Test(timeout = 90000)
- public void testWaitForNoBrokerInRegistry() throws Exception {
-
- long startTime = System.currentTimeMillis();
-
- try {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
- cf.createConnection();
- fail("expect broker not exist exception");
- }
- catch (JMSException expectedOnNoBrokerAndNoCreate) {
- }
-
- long endTime = System.currentTimeMillis();
-
- LOG.info("Total wait time was: {}", endTime - startTime);
- assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
- }
-
- @Test(timeout = 90000)
- public void testWaitForNotStartedButInRegistry() throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- BrokerRegistry.getInstance().bind("localhost", broker);
-
- long startTime = System.currentTimeMillis();
-
- try {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
- cf.createConnection();
- fail("expect broker not exist exception");
- }
- catch (JMSException expectedOnNoBrokerAndNoCreate) {
- }
-
- long endTime = System.currentTimeMillis();
-
- LOG.info("Total wait time was: {}", endTime - startTime);
- assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
deleted file mode 100644
index 2b97cff..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.bugs.embedded.ThreadExplorer;
-import org.apache.activemq.network.NetworkConnector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VmTransportNetworkBrokerTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(VmTransportNetworkBrokerTest.class);
-
- private static final String VM_BROKER_URI = "vm://localhost?create=false";
-
- CountDownLatch started = new CountDownLatch(1);
- CountDownLatch gotConnection = new CountDownLatch(1);
-
- public void testNoThreadLeak() throws Exception {
-
- // with VMConnection and simple discovery network connector
- int originalThreadCount = Thread.activeCount();
- LOG.debug(ThreadExplorer.show("threads at beginning"));
-
- BrokerService broker = new BrokerService();
- broker.setDedicatedTaskRunner(true);
- broker.setPersistent(false);
- broker.addConnector("tcp://localhost:61616");
- NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://wrongHostname1:61617,tcp://wrongHostname2:61618)?useExponentialBackOff=false");
- networkConnector.setDuplex(true);
- broker.start();
-
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI));
- Connection connection = cf.createConnection("system", "manager");
- connection.start();
-
- // let it settle
- TimeUnit.SECONDS.sleep(5);
-
- int threadCountAfterStart = Thread.activeCount();
- TimeUnit.SECONDS.sleep(30);
- int threadCountAfterSleep = Thread.activeCount();
-
- assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep, threadCountAfterSleep < threadCountAfterStart + 8);
-
- connection.close();
- broker.stop();
- broker.waitUntilStopped();
-
- // testNoDanglingThreadsAfterStop with tcp transport
- broker = new BrokerService();
- broker.setSchedulerSupport(true);
- broker.setDedicatedTaskRunner(true);
- broker.setPersistent(false);
- broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
- broker.start();
-
- cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
- connection = cf.createConnection("system", "manager");
- connection.start();
- connection.close();
- broker.stop();
- broker.waitUntilStopped();
-
- // let it settle
- TimeUnit.SECONDS.sleep(5);
-
- // get final threads but filter out any daemon threads that the JVM may have created.
- Thread[] threads = filterDaemonThreads(ThreadExplorer.listThreads());
- int threadCountAfterStop = threads.length;
-
- // lets see the thread counts at INFO level so they are always in the test log
- LOG.info(ThreadExplorer.show("active after stop"));
- LOG.info("originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop);
-
- assertTrue("Threads are leaking: " +
- ThreadExplorer.show("active after stop") +
- ". originalThreadCount=" +
- originalThreadCount +
- " threadCountAfterStop=" +
- threadCountAfterStop, threadCountAfterStop <= originalThreadCount);
- }
-
- /**
- * Filters any daemon threads from the thread list.
- *
- * Thread counts before and after the test should ideally be equal.
- * However there is no guarantee that the JVM does not create any
- * additional threads itself.
- * E.g. on Mac OSX there is a JVM internal thread called
- * "Poller SunPKCS11-Darwin" created after the test go started and
- * under the main thread group.
- * When debugging tests in Eclipse another so called "Reader" thread
- * is created by Eclipse.
- * So we cannot assume that the JVM does not create additional threads
- * during the test. However for the time being we assume that any such
- * additionally created threads are daemon threads.
- *
- * @param threads - the array of threads to parse
- * @return a new array with any daemon threads removed
- */
- public Thread[] filterDaemonThreads(Thread[] threads) throws Exception {
-
- List<Thread> threadList = new ArrayList<>(Arrays.asList(threads));
-
- // Can't use an Iterator as it would raise a
- // ConcurrentModificationException when trying to remove an element
- // from the list, so using standard walk through
- for (int i = 0; i < threadList.size(); i++) {
-
- Thread thread = threadList.get(i);
- LOG.debug("Inspecting thread " + thread.getName());
- if (thread.isDaemon()) {
- LOG.debug("Removing deamon thread.");
- threadList.remove(thread);
- Thread.sleep(100);
-
- }
- }
- LOG.debug("Converting list back to Array");
- return threadList.toArray(new Thread[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
index e78ab2f..534e68b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
@@ -390,7 +390,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
super.send(producerExchange, messageSend);
if (first.compareAndSet(false, true)) {
producerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
+ new Thread() {
@Override
public void run() {
try {
@@ -403,7 +403,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
e.printStackTrace();
}
}
- });
+ }.start();
}
}
}});
@@ -465,7 +465,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
super.send(producerExchange, messageSend);
if (first.compareAndSet(false, true)) {
producerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
+ new Thread() {
@Override
public void run() {
try {
@@ -478,7 +478,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
e.printStackTrace();
}
}
- });
+ }.start();
}
}
}});
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
new file mode 100644
index 0000000..03e0d2e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LockFileTest {
+
+ @Test
+ public void testNoDeleteOnUnlockIfNotLocked() throws Exception {
+
+ File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest1");
+ IOHelper.mkdirs(lockFile.getParentFile());
+ lockFile.createNewFile();
+
+ LockFile underTest = new LockFile(lockFile, true);
+
+ underTest.lock();
+
+ lockFile.delete();
+
+ assertFalse("no longer valid", underTest.keepAlive());
+
+ // a slave gets in
+ lockFile.createNewFile();
+
+ underTest.unlock();
+
+ assertTrue("file still exists after unlock when not locked", lockFile.exists());
+
+ }
+
+ @Test
+ public void testDeleteOnUnlockIfLocked() throws Exception {
+
+ File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest2");
+ IOHelper.mkdirs(lockFile.getParentFile());
+ lockFile.createNewFile();
+
+ LockFile underTest = new LockFile(lockFile, true);
+
+ underTest.lock();
+
+ assertTrue("valid", underTest.keepAlive());
+
+ underTest.unlock();
+
+ assertFalse("file deleted on unlock", lockFile.exists());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java
new file mode 100644
index 0000000..b01a4e1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketProxy {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(SocketProxy.class);
+
+ public static final int ACCEPT_TIMEOUT_MILLIS = 100;
+
+ private URI proxyUrl;
+ private URI target;
+
+ private Acceptor acceptor;
+ private ServerSocket serverSocket;
+
+ private CountDownLatch closed = new CountDownLatch(1);
+
+ public final List<Bridge> connections = new LinkedList<Bridge>();
+
+ private int listenPort = 0;
+
+ private int receiveBufferSize = -1;
+
+ private boolean pauseAtStart = false;
+
+ private int acceptBacklog = 50;
+
+ public SocketProxy() throws Exception {
+ }
+
+ public SocketProxy(URI uri) throws Exception {
+ this(0, uri);
+ }
+
+ public SocketProxy(int port, URI uri) throws Exception {
+ listenPort = port;
+ target = uri;
+ open();
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public void setTarget(URI tcpBrokerUri) {
+ target = tcpBrokerUri;
+ }
+
+ public void open() throws Exception {
+ serverSocket = createServerSocket(target);
+ serverSocket.setReuseAddress(true);
+ if (receiveBufferSize > 0) {
+ serverSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ if (proxyUrl == null) {
+ serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
+ proxyUrl = urlFromSocket(target, serverSocket);
+ } else {
+ serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
+ }
+ acceptor = new Acceptor(serverSocket, target);
+ if (pauseAtStart) {
+ acceptor.pause();
+ }
+ new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
+ closed = new CountDownLatch(1);
+ }
+
+ private boolean isSsl(URI target) {
+ return "ssl".equals(target.getScheme());
+ }
+
+ private ServerSocket createServerSocket(URI target) throws Exception {
+ if (isSsl(target)) {
+ return SSLServerSocketFactory.getDefault().createServerSocket();
+ }
+ return new ServerSocket();
+ }
+
+ private Socket createSocket(URI target) throws Exception {
+ if (isSsl(target)) {
+ return SSLSocketFactory.getDefault().createSocket();
+ }
+ return new Socket();
+ }
+
+ public URI getUrl() {
+ return proxyUrl;
+ }
+
+ /*
+ * close all proxy connections and acceptor
+ */
+ public void close() {
+ List<Bridge> connections;
+ synchronized(this.connections) {
+ connections = new ArrayList<Bridge>(this.connections);
+ }
+ LOG.info("close, numConnections=" + connections.size());
+ for (Bridge con : connections) {
+ closeConnection(con);
+ }
+ acceptor.close();
+ closed.countDown();
+ }
+
+ /*
+ * close all proxy receive connections, leaving acceptor
+ * open
+ */
+ public void halfClose() {
+ List<Bridge> connections;
+ synchronized(this.connections) {
+ connections = new ArrayList<Bridge>(this.connections);
+ }
+ LOG.info("halfClose, numConnections=" + connections.size());
+ for (Bridge con : connections) {
+ halfCloseConnection(con);
+ }
+ }
+
+ public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException {
+ return closed.await(timeoutSeconds, TimeUnit.SECONDS);
+ }
+
+ /*
+ * called after a close to restart the acceptor on the same port
+ */
+ public void reopen() {
+ LOG.info("reopen");
+ try {
+ open();
+ } catch (Exception e) {
+ LOG.debug("exception on reopen url:" + getUrl(), e);
+ }
+ }
+
+ /*
+ * pause accepting new connections and data transfer through existing proxy
+ * connections. All sockets remain open
+ */
+ public void pause() {
+ synchronized(connections) {
+ LOG.info("pause, numConnections=" + connections.size());
+ acceptor.pause();
+ for (Bridge con : connections) {
+ con.pause();
+ }
+ }
+ }
+
+ /*
+ * continue after pause
+ */
+ public void goOn() {
+ synchronized(connections) {
+ LOG.info("goOn, numConnections=" + connections.size());
+ for (Bridge con : connections) {
+ con.goOn();
+ }
+ }
+ acceptor.goOn();
+ }
+
+ private void closeConnection(Bridge c) {
+ try {
+ c.close();
+ } catch (Exception e) {
+ LOG.debug("exception on close of: " + c, e);
+ }
+ }
+
+ private void halfCloseConnection(Bridge c) {
+ try {
+ c.halfClose();
+ } catch (Exception e) {
+ LOG.debug("exception on half close of: " + c, e);
+ }
+ }
+
+ public boolean isPauseAtStart() {
+ return pauseAtStart;
+ }
+
+ public void setPauseAtStart(boolean pauseAtStart) {
+ this.pauseAtStart = pauseAtStart;
+ }
+
+ public int getAcceptBacklog() {
+ return acceptBacklog;
+ }
+
+ public void setAcceptBacklog(int acceptBacklog) {
+ this.acceptBacklog = acceptBacklog;
+ }
+
+ private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
+ int listenPort = serverSocket.getLocalPort();
+
+ return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
+ }
+
+ public class Bridge {
+
+ private Socket receiveSocket;
+ private Socket sendSocket;
+ private Pump requestThread;
+ private Pump responseThread;
+
+ public Bridge(Socket socket, URI target) throws Exception {
+ receiveSocket = socket;
+ sendSocket = createSocket(target);
+ if (receiveBufferSize > 0) {
+ sendSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ sendSocket.connect(new InetSocketAddress(target.getHost(), target.getPort()));
+ linkWithThreads(receiveSocket, sendSocket);
+ LOG.info("proxy connection " + sendSocket + ", receiveBufferSize=" + sendSocket.getReceiveBufferSize());
+ }
+
+ public void goOn() {
+ responseThread.goOn();
+ requestThread.goOn();
+ }
+
+ public void pause() {
+ requestThread.pause();
+ responseThread.pause();
+ }
+
+ public void close() throws Exception {
+ synchronized(connections) {
+ connections.remove(this);
+ }
+ receiveSocket.close();
+ sendSocket.close();
+ }
+
+ public void halfClose() throws Exception {
+ receiveSocket.close();
+ }
+
+ private void linkWithThreads(Socket source, Socket dest) {
+ requestThread = new Pump(source, dest);
+ requestThread.start();
+ responseThread = new Pump(dest, source);
+ responseThread.start();
+ }
+
+ public class Pump extends Thread {
+
+ protected Socket src;
+ private Socket destination;
+ private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+ public Pump(Socket source, Socket dest) {
+ super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort());
+ src = source;
+ destination = dest;
+ pause.set(new CountDownLatch(0));
+ }
+
+ public void pause() {
+ pause.set(new CountDownLatch(1));
+ }
+
+ public void goOn() {
+ pause.get().countDown();
+ }
+
+ public void run() {
+ byte[] buf = new byte[1024];
+ try {
+ InputStream in = src.getInputStream();
+ OutputStream out = destination.getOutputStream();
+ while (true) {
+ int len = in.read(buf);
+ if (len == -1) {
+ LOG.debug("read eof from:" + src);
+ break;
+ }
+ pause.get().await();
+ out.write(buf, 0, len);
+ }
+ } catch (Exception e) {
+ LOG.debug("read/write failed, reason: " + e.getLocalizedMessage());
+ try {
+ if (!receiveSocket.isClosed()) {
+ // for halfClose, on read/write failure if we close the
+ // remote end will see a close at the same time.
+ close();
+ }
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+ }
+
+ public class Acceptor implements Runnable {
+
+ private ServerSocket socket;
+ private URI target;
+ private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+
+ public Acceptor(ServerSocket serverSocket, URI uri) {
+ socket = serverSocket;
+ target = uri;
+ pause.set(new CountDownLatch(0));
+ try {
+ socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
+ } catch (SocketException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void pause() {
+ pause.set(new CountDownLatch(1));
+ }
+
+ public void goOn() {
+ pause.get().countDown();
+ }
+
+ public void run() {
+ try {
+ while(!socket.isClosed()) {
+ pause.get().await();
+ try {
+ Socket source = socket.accept();
+ pause.get().await();
+ if (receiveBufferSize > 0) {
+ source.setReceiveBufferSize(receiveBufferSize);
+ }
+ LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
+ synchronized(connections) {
+ connections.add(new Bridge(source, target));
+ }
+ } catch (SocketTimeoutException expected) {
+ }
+ }
+ } catch (Exception e) {
+ LOG.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
+ }
+ }
+
+ public void close() {
+ try {
+ socket.close();
+ closed.countDown();
+ goOn();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java
new file mode 100644
index 0000000..244db59
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait {
+
+ public static final long MAX_WAIT_MILLIS = 30*1000;
+ public static final long SLEEP_MILLIS = 1000;
+
+ public interface Condition {
+ boolean isSatisified() throws Exception;
+ }
+
+ public static boolean waitFor(Condition condition) throws Exception {
+ return waitFor(condition, MAX_WAIT_MILLIS);
+ }
+
+ public static boolean waitFor(final Condition condition, final long duration) throws Exception {
+ return waitFor(condition, duration, SLEEP_MILLIS);
+ }
+
+ public static boolean waitFor(final Condition condition, final long duration, final long sleepMillis) throws Exception {
+
+ final long expiry = System.currentTimeMillis() + duration;
+ boolean conditionSatisified = condition.isSatisified();
+ while (!conditionSatisified && System.currentTimeMillis() < expiry) {
+ TimeUnit.MILLISECONDS.sleep(sleepMillis);
+ conditionSatisified = condition.isSatisified();
+ }
+ return conditionSatisified;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/resources/org/apache/activemq/transport/tcp/n-brokers-ssl.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/resources/org/apache/activemq/transport/tcp/n-brokers-ssl.xml b/tests/activemq5-unit-tests/src/test/resources/org/apache/activemq/transport/tcp/n-brokers-ssl.xml
new file mode 100644
index 0000000..4bd5fc7
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/resources/org/apache/activemq/transport/tcp/n-brokers-ssl.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <amq:broker useJmx="false" persistent="false" start="false" brokerName="dummy">
+
+ <amq:sslContext>
+ <amq:sslContext
+ keyStore="dummy.keystore" keyStorePassword="password"/>
+ </amq:sslContext>
+
+ <amq:transportConnectors>
+ <amq:transportConnector uri="ssl://localhost:62616" />
+ </amq:transportConnectors>
+
+ </amq:broker>
+
+ <amq:broker useJmx="false" persistent="false" start="false" brokerName="activemq.org">
+ <amq:sslContext>
+ <amq:sslContext
+ keyStore="server.keystore" keyStorePassword="password"
+ trustStore="client.keystore" trustStorePassword="password"/>
+ </amq:sslContext>
+
+ <amq:transportConnectors>
+ <amq:transportConnector uri="ssl://localhost:63616" />
+ </amq:transportConnectors>
+
+ </amq:broker>
+</beans>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index d8aa4ac..a3bae65 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -50,10 +50,10 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.ServerSessionFactory;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -484,6 +484,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
@Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
+
+ @Override
public boolean isWritable(ReadyListener callback) {
return true;
}
@@ -502,7 +507,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
*/
@Override
- public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
inCall.countDown();
try {
callbackSemaphore.acquire();
@@ -513,7 +518,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
try {
- return targetCallback.sendMessage(message, consumer, deliveryCount);
+ return targetCallback.sendMessage(ref, message, consumer, deliveryCount);
}
finally {
callbackSemaphore.release();
@@ -525,8 +530,8 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/
@Override
- public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
- return targetCallback.sendLargeMessage(message, consumer, bodySize, deliveryCount);
+ public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
}
/* (non-Javadoc)
@@ -576,7 +581,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
String defaultAddress,
SessionCallback callback,
OperationContext context,
- ServerSessionFactory sessionFactory,
boolean autoCreateQueue) throws Exception {
return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
index d2e3215..09fd9b7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Before;
@@ -44,6 +45,9 @@ public class BasicOpenWireTest extends OpenWireTestBase {
protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+ protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString);
+
+
protected ActiveMQConnection connection;
protected String topicName = "amqTestTopic1";
protected String queueName = "amqTestQueue1";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
index a1a5e38..14cfee0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
@@ -26,6 +26,7 @@ import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -118,7 +119,7 @@ public class BasicSecurityTest extends BasicOpenWireTest {
}
@Test
- public void testSendnReceiveAuthorization() throws Exception {
+ public void testSendnReceiveAuthorization() throws Exception {
Connection sendingConn = null;
Connection receivingConn = null;
@@ -152,16 +153,18 @@ public class BasicSecurityTest extends BasicOpenWireTest {
producer = sendingSession.createProducer(dest);
producer.send(message);
- MessageConsumer consumer = null;
+ MessageConsumer consumer;
try {
consumer = sendingSession.createConsumer(dest);
+ Assert.fail("exception expected");
}
catch (JMSSecurityException e) {
+ e.printStackTrace();
//expected
}
consumer = receivingSession.createConsumer(dest);
- TextMessage received = (TextMessage) consumer.receive();
+ TextMessage received = (TextMessage) consumer.receive(5000);
assertNotNull(received);
assertEquals("Hello World", received.getText());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
index 825b8b5..69d9784 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import static org.junit.Assert.assertEquals;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.junit.Test;
public class OpenWireUtilTest {