You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/04 01:16:33 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5621
Repository: activemq
Updated Branches:
refs/heads/master 4d5bb4ab7 -> 257a4fa41
https://issues.apache.org/jira/browse/AMQ-5621
Clean up test, don't start JMX connector as it is not needed, add
timeout.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/257a4fa4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/257a4fa4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/257a4fa4
Branch: refs/heads/master
Commit: 257a4fa416ec2b5ad757d78c71504830ced70963
Parents: 4d5bb4a
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 3 19:16:28 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 3 19:16:28 2015 -0500
----------------------------------------------------------------------
.../org/apache/activemq/bugs/AMQ3678Test.java | 113 +++++--------------
1 file changed, 27 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/257a4fa4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
index 3c79fcf..59f7faa 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
@@ -14,12 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.bugs;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.ServerSocket;
+import static org.junit.Assert.fail;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,59 +29,34 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQTopicSubscriber;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.fail;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQ3678Test implements MessageListener {
- public int deliveryMode = DeliveryMode.NON_PERSISTENT;
-
+ private static Logger LOG = LoggerFactory.getLogger(AMQ3678Test.class);
private BrokerService broker;
+ private String connectionURI;
- AtomicInteger messagesSent = new AtomicInteger(0);
- AtomicInteger messagesReceived = new AtomicInteger(0);
-
- ActiveMQTopic destination = new ActiveMQTopic("XYZ");
-
- int port;
- int jmxport;
-
+ private final AtomicInteger messagesSent = new AtomicInteger(0);
+ private final AtomicInteger messagesReceived = new AtomicInteger(0);
+ private final ActiveMQTopic destination = new ActiveMQTopic("XYZ");
+ private final CountDownLatch latch = new CountDownLatch(2);
+ private final int deliveryMode = DeliveryMode.NON_PERSISTENT;
- final CountDownLatch latch = new CountDownLatch(2);
-
-
- public static void main(String[] args) throws Exception {
-
- }
-
-
- public static int findFreePort() throws IOException {
- ServerSocket socket = null;
-
- try {
- // 0 is open a socket on any free port
- socket = new ServerSocket(0);
- return socket.getLocalPort();
- } finally {
- if (socket != null) {
- socket.close();
- }
- }
- }
-
-
- @Test
+ @Test(timeout = 60000)
public void countConsumers() throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + port);
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
factory.setAlwaysSyncSend(true);
factory.setDispatchAsync(false);
@@ -95,13 +68,13 @@ public class AMQ3678Test implements MessageListener {
consumerConnection.setClientID("subscriber1");
Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- ActiveMQTopicSubscriber activeConsumer = (ActiveMQTopicSubscriber) consumerMQSession.createDurableSubscriber(destination, "myTopic?consumer.prefetchSize=1");
+ ActiveMQTopicSubscriber activeConsumer = (ActiveMQTopicSubscriber) consumerMQSession.createDurableSubscriber(destination,
+ "myTopic?consumer.prefetchSize=1");
activeConsumer.setMessageListener(this);
consumerConnection.start();
-
final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = producerSession.createProducer(destination);
producer.setDeliveryMode(deliveryMode);
@@ -110,17 +83,14 @@ public class AMQ3678Test implements MessageListener {
private boolean done = false;
+ @Override
public void run() {
while (!done) {
if (messagesSent.get() == 50) {
try {
broker.getAdminView().removeTopic(destination.getTopicName());
} catch (Exception e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- System.err.flush();
- fail("Unable to remove destination:"
- + destination.getPhysicalName());
+ fail("Unable to remove destination:" + destination.getPhysicalName());
}
}
@@ -128,18 +98,15 @@ public class AMQ3678Test implements MessageListener {
producer.send(producerSession.createTextMessage());
int val = messagesSent.incrementAndGet();
- System.out.println("sent message (" + val + ")");
- System.out.flush();
+ LOG.trace("sent message (" + val + ")");
if (val == 100) {
done = true;
latch.countDown();
producer.close();
producerSession.close();
-
}
} catch (JMSException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
}
@@ -153,68 +120,42 @@ public class AMQ3678Test implements MessageListener {
fail("did not receive all the messages");
}
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
fail("did not receive all the messages, exception waiting for latch");
- e.printStackTrace();
}
-
-
-//
-
-
}
@Before
public void setUp() throws Exception {
-
- try {
- port = findFreePort();
- jmxport = findFreePort();
- } catch (Exception e) {
- fail("Unable to obtain a free port on which to start the broker");
- }
-
- System.out.println("Starting broker");
- System.out.flush();
broker = new BrokerService();
broker.setPersistent(false);
- ManagementContext ctx = new ManagementContext(ManagementFactory.getPlatformMBeanServer());
- ctx.setConnectorPort(jmxport);
- broker.setManagementContext(ctx);
broker.setUseJmx(true);
-// broker.setAdvisorySupport(false);
-// broker.setDeleteAllMessagesOnStartup(true);
+ broker.getManagementContext().setCreateConnector(false);
+ broker.setAdvisorySupport(false);
- broker.addConnector("tcp://localhost:" + port).setName("Default");
+ TransportConnector connector = broker.addConnector("tcp://localhost:0");
broker.start();
+ broker.waitUntilStarted();
-
- System.out.println("End of Broker Setup");
- System.out.flush();
+ connectionURI = connector.getPublishableConnectString();
}
@After
public void tearDown() throws Exception {
broker.stop();
+ broker.waitUntilStopped();
}
-
@Override
public void onMessage(Message message) {
try {
message.acknowledge();
int val = messagesReceived.incrementAndGet();
- System.out.println("received message (" + val + ")");
- System.out.flush();
+ LOG.trace("received message (" + val + ")");
if (messagesReceived.get() == 100) {
latch.countDown();
}
} catch (JMSException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
-
}
-
-
}