You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/04 23:43:05 UTC
[21/58] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
deleted file mode 100644
index 74c19b7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
+++ /dev/null
@@ -1,763 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3274Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3274Test.class);
-
- protected static int Next_broker_num = 0;
- protected EmbeddedTcpBroker broker1;
- protected EmbeddedTcpBroker broker2;
-
- protected int nextEchoId = 0;
- protected boolean testError = false;
-
- protected int echoResponseFill = 0; // Number of "filler" response messages per request
-
- public AMQ3274Test() throws Exception {
- broker1 = new EmbeddedTcpBroker();
- broker2 = new EmbeddedTcpBroker();
-
- broker1.coreConnectTo(broker2, true);
- broker2.coreConnectTo(broker1, true);
- }
-
- public void logMessage(String msg) {
- System.out.println(msg);
- System.out.flush();
- }
-
- public void testMessages(Session sess,
- MessageProducer req_prod,
- Destination resp_dest,
- int num_msg) throws Exception {
- MessageConsumer resp_cons;
- TextMessage msg;
- MessageClient cons_client;
- int cur;
- int tot_expected;
-
- resp_cons = sess.createConsumer(resp_dest);
-
- cons_client = new MessageClient(resp_cons, num_msg);
- cons_client.start();
-
- cur = 0;
- while ((cur < num_msg) && (!testError)) {
- msg = sess.createTextMessage("MSG AAAA " + cur);
- msg.setIntProperty("SEQ", 100 + cur);
- msg.setStringProperty("TEST", "TOPO");
- msg.setJMSReplyTo(resp_dest);
-
- if (cur == (num_msg - 1))
- msg.setBooleanProperty("end-of-response", true);
-
- req_prod.send(msg);
-
- cur++;
- }
-
- cons_client.waitShutdown(5000);
-
- if (cons_client.shutdown()) {
- LOG.debug("Consumer client shutdown complete");
- }
- else {
- LOG.debug("Consumer client shutdown incomplete!!!");
- }
-
- tot_expected = num_msg * (echoResponseFill + 1);
-
- if (cons_client.getNumMsgReceived() == tot_expected) {
- LOG.info("Have " + tot_expected + " messages, as-expected");
- }
- else {
- LOG.error("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected);
- testError = true;
- }
-
- resp_cons.close();
- }
-
- /**
- * Test one destination between the given "producer broker" and
- * "consumer broker" specified.
- */
- public void testOneDest(Connection conn,
- Session sess,
- Destination cons_dest,
- String prod_broker_url,
- String cons_broker_url,
- int num_msg) throws Exception {
- int echo_id;
-
- EchoService echo_svc;
- String echo_queue_name;
- Destination prod_dest;
- MessageProducer msg_prod;
-
- synchronized (this) {
- echo_id = this.nextEchoId;
- this.nextEchoId++;
- }
-
- echo_queue_name = "echo.queue." + echo_id;
-
- LOG.trace("destroying the echo queue in case an old one exists");
- removeQueue(conn, echo_queue_name);
-
- echo_svc = new EchoService(echo_queue_name, prod_broker_url);
- echo_svc.start();
-
- LOG.trace("Creating echo queue and producer");
- prod_dest = sess.createQueue(echo_queue_name);
- msg_prod = sess.createProducer(prod_dest);
-
- testMessages(sess, msg_prod, cons_dest, num_msg);
-
- echo_svc.shutdown();
- msg_prod.close();
- }
-
- /**
- * TEST TEMPORARY TOPICS
- */
- public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception {
- Connection conn;
- Session sess;
- Destination cons_dest;
- int num_msg;
-
- num_msg = 5;
-
- LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
-
- conn = createConnection(cons_broker_url);
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- LOG.trace("Creating destination");
- cons_dest = sess.createTemporaryTopic();
-
- testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
- sess.close();
- conn.close();
- }
-
- /**
- * TEST TOPICS
- */
- public void testTopic(String prod_broker_url, String cons_broker_url) throws Exception {
- int num_msg;
-
- Connection conn;
- Session sess;
- String topic_name;
-
- Destination cons_dest;
-
- num_msg = 5;
-
- LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
-
- conn = createConnection(cons_broker_url);
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- topic_name = "topotest2.perm.topic";
- LOG.trace("Removing existing Topic");
- removeTopic(conn, topic_name);
- LOG.trace("Creating Topic, " + topic_name);
- cons_dest = sess.createTopic(topic_name);
-
- testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
- removeTopic(conn, topic_name);
- sess.close();
- conn.close();
- }
-
- /**
- * TEST TEMPORARY QUEUES
- */
- public void testTempQueue(String prod_broker_url, String cons_broker_url) throws Exception {
- int num_msg;
-
- Connection conn;
- Session sess;
-
- Destination cons_dest;
-
- num_msg = 5;
-
- LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
-
- conn = createConnection(cons_broker_url);
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- LOG.trace("Creating destination");
- cons_dest = sess.createTemporaryQueue();
-
- testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
- sess.close();
- conn.close();
- }
-
- /**
- * TEST QUEUES
- */
- public void testQueue(String prod_broker_url, String cons_broker_url) throws Exception {
- int num_msg;
-
- Connection conn;
- Session sess;
- String queue_name;
-
- Destination cons_dest;
-
- num_msg = 5;
-
- LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
-
- conn = createConnection(cons_broker_url);
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- queue_name = "topotest2.perm.queue";
- LOG.trace("Removing existing Queue");
- removeQueue(conn, queue_name);
- LOG.trace("Creating Queue, " + queue_name);
- cons_dest = sess.createQueue(queue_name);
-
- testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
- removeQueue(conn, queue_name);
- sess.close();
- conn.close();
- }
-
- @Test
- public void run() throws Exception {
- Thread start1;
- Thread start2;
-
- testError = false;
-
- // Use threads to avoid startup deadlock since the first broker started waits until
- // it knows the name of the remote broker before finishing its startup, which means
- // the remote must already be running.
-
- start1 = new Thread() {
- @Override
- public void run() {
- try {
- broker1.start();
- }
- catch (Exception ex) {
- LOG.error(null, ex);
- }
- }
- };
-
- start2 = new Thread() {
- @Override
- public void run() {
- try {
- broker2.start();
- }
- catch (Exception ex) {
- LOG.error(null, ex);
- }
- }
- };
-
- start1.start();
- start2.start();
-
- start1.join();
- start2.join();
-
- if (!testError) {
- this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
- }
- if (!testError) {
- this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
- }
- if (!testError) {
- this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
- }
- if (!testError) {
- this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
- }
- Thread.sleep(100);
-
- shutdown();
-
- assertTrue(!testError);
- }
-
- public void shutdown() throws Exception {
- broker1.stop();
- broker2.stop();
- }
-
- /**
- * @param args the command line arguments
- */
- public static void main(String[] args) {
- AMQ3274Test main_obj;
-
- try {
- main_obj = new AMQ3274Test();
- main_obj.run();
- }
- catch (Exception ex) {
- ex.printStackTrace();
- LOG.error(null, ex);
- System.exit(0);
- }
- }
-
- protected Connection createConnection(String url) throws Exception {
- return org.apache.activemq.ActiveMQConnection.makeConnection(url);
- }
-
- protected static void removeQueue(Connection conn, String dest_name) throws java.lang.Exception {
- org.apache.activemq.command.ActiveMQDestination dest;
-
- if (conn instanceof org.apache.activemq.ActiveMQConnection) {
- dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name, org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
- ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
- }
- }
-
- protected static void removeTopic(Connection conn, String dest_name) throws java.lang.Exception {
- org.apache.activemq.command.ActiveMQDestination dest;
-
- if (conn instanceof org.apache.activemq.ActiveMQConnection) {
- dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name, org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
- ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
- }
- }
-
- @SuppressWarnings("rawtypes")
- public static String fmtMsgInfo(Message msg) throws Exception {
- StringBuilder msg_desc;
- String prop;
- Enumeration prop_enum;
-
- msg_desc = new StringBuilder();
- msg_desc = new StringBuilder();
-
- if (msg instanceof TextMessage) {
- msg_desc.append(((TextMessage) msg).getText());
- }
- else {
- msg_desc.append("[");
- msg_desc.append(msg.getClass().getName());
- msg_desc.append("]");
- }
-
- prop_enum = msg.getPropertyNames();
- while (prop_enum.hasMoreElements()) {
- prop = (String) prop_enum.nextElement();
- msg_desc.append("; ");
- msg_desc.append(prop);
- msg_desc.append("=");
- msg_desc.append(msg.getStringProperty(prop));
- }
-
- return msg_desc.toString();
- }
-
- // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // /////////////////////////////////////////////// INTERNAL CLASSES
- // /////////////////////////////////////////////////
- // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- protected class EmbeddedTcpBroker {
-
- protected BrokerService brokerSvc;
- protected int brokerNum;
- protected String brokerName;
- protected String brokerId;
- protected int port;
- protected String tcpUrl;
-
- public EmbeddedTcpBroker() throws Exception {
- brokerSvc = new BrokerService();
-
- synchronized (this.getClass()) {
- brokerNum = Next_broker_num;
- Next_broker_num++;
- }
-
- brokerName = "broker" + brokerNum;
- brokerId = "b" + brokerNum;
-
- brokerSvc.setBrokerName(brokerName);
- brokerSvc.setBrokerId(brokerId);
- brokerSvc.setPersistent(false);
- brokerSvc.setUseJmx(false);
- tcpUrl = brokerSvc.addConnector("tcp://localhost:0").getPublishableConnectString();
- }
-
- public Connection createConnection() throws URISyntaxException, JMSException {
- Connection result;
-
- result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
-
- return result;
- }
-
- public String getConnectionUrl() {
- return this.tcpUrl;
- }
-
- /**
- * Create network connections to the given broker using the
- * network-connector configuration of CORE brokers (e.g.
- * core1.bus.dev1.coresys.tmcs)
- *
- * @param other
- * @param duplex_f
- */
- public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) throws Exception {
- this.makeConnectionTo(other, duplex_f, true);
- this.makeConnectionTo(other, duplex_f, false);
- }
-
- public void start() throws Exception {
- brokerSvc.start();
- }
-
- public void stop() throws Exception {
- brokerSvc.stop();
- }
-
- /**
- * Make one connection to the other embedded broker, of the specified
- * type (queue or topic) using the standard CORE broker networking.
- *
- * @param other
- * @param duplex_f
- * @param queue_f
- * @throws Exception
- */
- protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f) throws Exception {
- NetworkConnector nw_conn;
- String prefix;
- ActiveMQDestination excl_dest;
- ArrayList<ActiveMQDestination> excludes;
-
- nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
- nw_conn.setDuplex(duplex_f);
-
- if (queue_f)
- nw_conn.setConduitSubscriptions(false);
- else
- nw_conn.setConduitSubscriptions(true);
-
- nw_conn.setNetworkTTL(5);
- nw_conn.setSuppressDuplicateQueueSubscriptions(true);
- nw_conn.setDecreaseNetworkConsumerPriority(true);
- nw_conn.setBridgeTempDestinations(true);
-
- if (queue_f) {
- prefix = "queue";
- excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
- }
- else {
- prefix = "topic";
- excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
- }
-
- excludes = new ArrayList<>();
- excludes.add(excl_dest);
- nw_conn.setExcludedDestinations(excludes);
-
- if (duplex_f)
- nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
- else
- nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
-
- brokerSvc.addNetworkConnector(nw_conn);
- }
- }
-
- protected class MessageClient extends java.lang.Thread {
-
- protected MessageConsumer msgCons;
- protected boolean shutdownInd;
- protected int expectedCount;
- protected int lastSeq = 0;
- protected int msgCount = 0;
- protected boolean haveFirstSeq;
- protected CountDownLatch shutdownLatch;
-
- public MessageClient(MessageConsumer cons, int num_to_expect) {
- msgCons = cons;
- expectedCount = (num_to_expect * (echoResponseFill + 1));
- shutdownLatch = new CountDownLatch(1);
- }
-
- @Override
- public void run() {
- CountDownLatch latch;
-
- try {
- synchronized (this) {
- latch = shutdownLatch;
- }
-
- shutdownInd = false;
- processMessages();
-
- latch.countDown();
- }
- catch (Exception exc) {
- LOG.error("message client error", exc);
- }
- }
-
- public void waitShutdown(long timeout) {
- CountDownLatch latch;
-
- try {
- synchronized (this) {
- latch = shutdownLatch;
- }
-
- if (latch != null)
- latch.await(timeout, TimeUnit.MILLISECONDS);
- else
- LOG.info("echo client shutdown: client does not appear to be active");
- }
- catch (InterruptedException int_exc) {
- LOG.warn("wait for message client shutdown interrupted", int_exc);
- }
- }
-
- public boolean shutdown() {
- boolean down_ind;
-
- if (!shutdownInd) {
- shutdownInd = true;
- }
-
- waitShutdown(200);
-
- synchronized (this) {
- if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0))
- down_ind = true;
- else
- down_ind = false;
- }
-
- return down_ind;
- }
-
- public int getNumMsgReceived() {
- return msgCount;
- }
-
- protected void processMessages() throws Exception {
- Message in_msg;
-
- haveFirstSeq = false;
- while ((!shutdownInd) && (!testError)) {
- in_msg = msgCons.receive(100);
-
- if (in_msg != null) {
- msgCount++;
- checkMessage(in_msg);
- }
- }
- }
-
- protected void checkMessage(Message in_msg) throws Exception {
- int seq;
-
- LOG.debug("received message " + fmtMsgInfo(in_msg));
-
- if (in_msg.propertyExists("SEQ")) {
- seq = in_msg.getIntProperty("SEQ");
-
- if ((haveFirstSeq) && (seq != (lastSeq + 1))) {
- LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(lastSeq + 1) + " but have " + Integer.toString(seq));
-
- testError = true;
- }
-
- lastSeq = seq;
-
- if (msgCount > expectedCount) {
- LOG.warn("*** have more messages than expected; have " + msgCount + "; expect " + expectedCount);
-
- testError = true;
- }
- }
-
- if (in_msg.propertyExists("end-of-response")) {
- LOG.trace("received end-of-response message");
- shutdownInd = true;
- }
- }
- }
-
- protected class EchoService extends java.lang.Thread {
-
- protected String destName;
- protected Connection jmsConn;
- protected Session sess;
- protected MessageConsumer msg_cons;
- protected boolean Shutdown_ind;
-
- protected Destination req_dest;
- protected Destination resp_dest;
- protected MessageProducer msg_prod;
-
- protected CountDownLatch waitShutdown;
-
- public EchoService(String dest, Connection broker_conn) throws Exception {
- destName = dest;
- jmsConn = broker_conn;
-
- Shutdown_ind = false;
-
- sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- req_dest = sess.createQueue(destName);
- msg_cons = sess.createConsumer(req_dest);
-
- jmsConn.start();
-
- waitShutdown = new CountDownLatch(1);
- }
-
- public EchoService(String dest, String broker_url) throws Exception {
- this(dest, ActiveMQConnection.makeConnection(broker_url));
- }
-
- @Override
- public void run() {
- Message req;
-
- try {
- LOG.info("STARTING ECHO SERVICE");
-
- while (!Shutdown_ind) {
- req = msg_cons.receive(100);
- if (req != null) {
- if (LOG.isDebugEnabled())
- LOG.debug("ECHO request message " + req.toString());
-
- resp_dest = req.getJMSReplyTo();
- if (resp_dest != null) {
- msg_prod = sess.createProducer(resp_dest);
- msg_prod.send(req);
- msg_prod.close();
- msg_prod = null;
- }
- else {
- LOG.warn("invalid request: no reply-to destination given");
- }
- }
- }
- }
- catch (Exception ex) {
- LOG.error(null, ex);
- }
- finally {
- LOG.info("shutting down test echo service");
-
- try {
- jmsConn.stop();
- }
- catch (javax.jms.JMSException jms_exc) {
- LOG.warn("error on shutting down JMS connection", jms_exc);
- }
-
- synchronized (this) {
- waitShutdown.countDown();
- }
- }
- }
-
- /**
- * Shut down the service, waiting up to 3 seconds for the service to
- * terminate.
- */
- public void shutdown() {
- CountDownLatch wait_l;
-
- synchronized (this) {
- wait_l = waitShutdown;
- }
-
- Shutdown_ind = true;
-
- try {
- if (wait_l != null) {
- if (wait_l.await(3000, TimeUnit.MILLISECONDS)) {
- LOG.info("echo service shutdown complete");
- }
- else {
- LOG.warn("timeout waiting for echo service shutdown");
- }
- }
- else {
- LOG.info("echo service shutdown: service does not appear to be active");
- }
- }
- catch (InterruptedException int_exc) {
- LOG.warn("interrupted while waiting for echo service shutdown");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
deleted file mode 100644
index a90521b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.Topic;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3324Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class);
-
- private static final String bindAddress = "tcp://0.0.0.0:0";
- private BrokerService broker;
- private ActiveMQConnectionFactory cf;
-
- private static final int MESSAGE_COUNT = 100;
-
- @Before
- public void setUp() throws Exception {
- broker = this.createBroker();
- String address = broker.getTransportConnectors().get(0).getPublishableConnectString();
- broker.start();
- broker.waitUntilStarted();
-
- cf = new ActiveMQConnectionFactory(address);
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Test
- public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception {
-
- Connection connection = cf.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- final TemporaryQueue queue = session.createTemporaryQueue();
- MessageConsumer consumer = session.createConsumer(queue);
-
- final Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
-
- MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
- MessageProducer producer = session.createProducer(queue);
-
- // send lots of messages to the tempQueue
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes(new byte[1024]);
- producer.send(m);
- }
-
- // consume one message from tempQueue
- Message msg = consumer.receive(5000);
- assertNotNull(msg);
-
- // check one advisory message has produced on the advisoryTopic
- Message advCmsg = advisoryConsumer.receive(5000);
- assertNotNull(advCmsg);
-
- connection.close();
- LOG.debug("Connection closed, destinations should now become inactive.");
-
- assertTrue("The destination " + advisoryTopic + "was not removed. ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker.getAdminView().getTopics().length == 0;
- }
- }));
-
- assertTrue("The destination " + queue + " was not removed. ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker.getAdminView().getTemporaryQueues().length == 0;
- }
- }));
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseMirroredQueues(true);
- answer.setPersistent(false);
- answer.setSchedulePeriodForDestinationPurge(1000);
-
- PolicyEntry entry = new PolicyEntry();
- entry.setGcInactiveDestinations(true);
- entry.setInactiveTimeoutBeforeGC(2000);
- entry.setProducerFlowControl(true);
- entry.setAdvisoryForConsumed(true);
- entry.setAdvisoryForFastProducers(true);
- entry.setAdvisoryForDelivery(true);
- PolicyMap map = new PolicyMap();
- map.setDefaultEntry(entry);
-
- MirroredQueue mirrorQ = new MirroredQueue();
- mirrorQ.setCopyMessage(true);
- DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ};
- answer.setDestinationInterceptors(destinationInterceptors);
-
- answer.setDestinationPolicy(map);
- answer.addConnector(bindAddress);
-
- return answer;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java
deleted file mode 100644
index 6f27bdd..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3352Test {
-
- TransportConnector connector;
- BrokerService brokerService;
-
- @Before
- public void startBroker() throws Exception {
- brokerService = new BrokerService();
- brokerService.setDeleteAllMessagesOnStartup(true);
- connector = brokerService.addConnector("tcp://0.0.0.0:0");
- brokerService.start();
- }
-
- @After
- public void stopBroker() throws Exception {
- brokerService.stop();
- }
-
- @Test
- public void verifyEnqueueLargeNumWithStateTracker() throws Exception {
- String url = "failover:(" + connector.getPublishableConnectString() + ")?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=131072";
-
- ActiveMQConnection conn = (ActiveMQConnection) new ActiveMQConnectionFactory(url).createConnection(null, null);
-
- Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageProducer producer = session.createProducer(session.createQueue("EVENTQ"));
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- producer.setDisableMessageID(true);
- producer.setDisableMessageTimestamp(true);
-
- StringBuffer buffer = new StringBuffer();
- for (int i = 0; i < 1024; i++) {
- buffer.append(String.valueOf(Math.random()));
- }
- String payload = buffer.toString();
-
- for (int i = 0; i < 10000; i++) {
- StringBuffer buff = new StringBuffer("x");
- buff.append(payload);
- producer.send(session.createTextMessage(buff.toString()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
deleted file mode 100644
index 5a58dd3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3405Test extends TestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ3405Test.class);
-
- private Connection connection;
- private Session session;
- private MessageConsumer consumer;
- private MessageProducer producer;
- private int deliveryMode = DeliveryMode.PERSISTENT;
- private Destination dlqDestination;
- private MessageConsumer dlqConsumer;
- private BrokerService broker;
-
- private int messageCount;
- private Destination destination;
- private int rollbackCount;
- private Session dlqSession;
- private final Error[] error = new Error[1];
- private boolean topic = true;
- private boolean durableSubscriber = true;
-
- public void testTransientTopicMessage() throws Exception {
- topic = true;
- deliveryMode = DeliveryMode.NON_PERSISTENT;
- durableSubscriber = true;
- doTest();
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- PolicyEntry policy = new PolicyEntry();
- DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
- if (defaultDeadLetterStrategy != null) {
- defaultDeadLetterStrategy.setProcessNonPersistent(true);
- }
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
- broker.setDestinationPolicy(pMap);
- return broker;
- }
-
- protected void doTest() throws Exception {
- messageCount = 200;
- connection.start();
-
- final QueueViewMBean dlqView = getProxyToDLQ();
-
- ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
- rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
- LOG.info("Will redeliver messages: " + rollbackCount + " times");
-
- makeConsumer();
- makeDlqConsumer();
- dlqConsumer.close();
-
- sendMessages();
-
- // now lets receive and rollback N times
- int maxRollbacks = messageCount * rollbackCount;
-
- consumer.setMessageListener(new RollbackMessageListener(maxRollbacks, rollbackCount));
-
- // We receive and rollback into the DLQ N times moving the DLQ messages back to their
- // original Q to test that they are continually placed back in the DLQ.
- for (int i = 0; i < 2; ++i) {
-
- assertTrue("DLQ was not filled as expected", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return dlqView.getQueueSize() == messageCount;
- }
- }));
-
- connection.stop();
-
- assertEquals("DLQ should be full now.", messageCount, dlqView.getQueueSize());
-
- String moveTo;
- if (topic) {
- moveTo = "topic://" + ((Topic) getDestination()).getTopicName();
- }
- else {
- moveTo = "queue://" + ((Queue) getDestination()).getQueueName();
- }
-
- LOG.debug("Moving " + messageCount + " messages from ActiveMQ.DLQ to " + moveTo);
- dlqView.moveMatchingMessagesTo("", moveTo);
-
- assertTrue("DLQ was not emptied as expected", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return dlqView.getQueueSize() == 0;
- }
- }));
-
- connection.start();
- }
- }
-
- protected void makeConsumer() throws JMSException {
- Destination destination = getDestination();
- LOG.info("Consuming from: " + destination);
- if (durableSubscriber) {
- consumer = session.createDurableSubscriber((Topic) destination, destination.toString());
- }
- else {
- consumer = session.createConsumer(destination);
- }
- }
-
- protected void makeDlqConsumer() throws JMSException {
- dlqDestination = createDlqDestination();
-
- LOG.info("Consuming from dead letter on: " + dlqDestination);
- dlqConsumer = dlqSession.createConsumer(dlqDestination);
- }
-
- @Override
- protected void setUp() throws Exception {
- broker = createBroker();
- broker.start();
- broker.waitUntilStarted();
-
- connection = createConnection();
- connection.setClientID(createClientId());
-
- session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- connection.start();
-
- dlqSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- @Override
- protected void tearDown() throws Exception {
- dlqConsumer.close();
- dlqSession.close();
- session.close();
-
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Override
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- ActiveMQConnectionFactory answer = super.createConnectionFactory();
- RedeliveryPolicy policy = new RedeliveryPolicy();
- policy.setMaximumRedeliveries(3);
- policy.setBackOffMultiplier((short) 1);
- policy.setRedeliveryDelay(0);
- policy.setInitialRedeliveryDelay(0);
- policy.setUseExponentialBackOff(false);
- answer.setRedeliveryPolicy(policy);
- return answer;
- }
-
- protected void sendMessages() throws JMSException {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(getDestination());
- producer.setDeliveryMode(deliveryMode);
-
- LOG.info("Sending " + messageCount + " messages to: " + getDestination());
- for (int i = 0; i < messageCount; i++) {
- Message message = createMessage(session, i);
- producer.send(message);
- }
- }
-
- protected TextMessage createMessage(Session session, int i) throws JMSException {
- return session.createTextMessage(getMessageText(i));
- }
-
- protected String getMessageText(int i) {
- return "message: " + i;
- }
-
- protected Destination createDlqDestination() {
- return new ActiveMQQueue("ActiveMQ.DLQ");
- }
-
- private QueueViewMBean getProxyToDLQ() throws MalformedObjectNameException, JMSException {
- ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost," + "destinationType=Queue,destinationName=ActiveMQ.DLQ");
- QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
- return proxy;
- }
-
- protected Destination getDestination() {
- if (destination == null) {
- destination = createDestination();
- }
- return destination;
- }
-
- protected String createClientId() {
- return toString();
- }
-
- class RollbackMessageListener implements MessageListener {
-
- final int maxRollbacks;
- final int deliveryCount;
- final AtomicInteger rollbacks = new AtomicInteger();
-
- RollbackMessageListener(int c, int delvery) {
- maxRollbacks = c;
- deliveryCount = delvery;
- }
-
- @Override
- public void onMessage(Message message) {
- try {
- int expectedMessageId = rollbacks.get() / deliveryCount;
- LOG.info("expecting messageId: " + expectedMessageId);
- rollbacks.incrementAndGet();
- session.rollback();
- }
- catch (Throwable e) {
- LOG.error("unexpected exception:" + e, e);
- // propagating assertError to execution task will cause a hang
- // at shutdown
- if (e instanceof Error) {
- error[0] = (Error) e;
- }
- else {
- fail("unexpected exception: " + e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
deleted file mode 100644
index 8fd2765..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageConsumer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3436Test {
-
- protected static final Logger LOG = LoggerFactory.getLogger(AMQ3436Test.class);
-
- private BrokerService broker;
- private PersistenceAdapter adapter;
- private boolean useCache = true;
- private boolean prioritizeMessages = true;
-
- protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
- KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
- adapter.setConcurrentStoreAndDispatchQueues(false);
- adapter.setConcurrentStoreAndDispatchTopics(false);
- adapter.deleteAllMessages();
- return adapter;
- }
-
- @Before
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setBrokerName("priorityTest");
- broker.setAdvisorySupport(false);
- broker.setUseJmx(false);
- adapter = createPersistenceAdapter(true);
- broker.setPersistenceAdapter(adapter);
- PolicyEntry policy = new PolicyEntry();
- policy.setPrioritizedMessages(prioritizeMessages);
- policy.setUseCache(useCache);
- policy.setProducerFlowControl(false);
- PolicyMap policyMap = new PolicyMap();
- policyMap.put(new ActiveMQQueue("TEST"), policy);
-
- // do not process expired for one test
- PolicyEntry ignoreExpired = new PolicyEntry();
- SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
- ignoreExpiredStrategy.setProcessExpired(false);
- ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
-
- broker.setDestinationPolicy(policyMap);
- broker.start();
- broker.waitUntilStarted();
- }
-
- protected void tearDown() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- @Test
- public void testPriorityWhenConsumerCreatedBeforeProduction() throws Exception {
-
- int messageCount = 200;
- URI failoverUri = new URI("vm://priorityTest?jms.prefetchPolicy.all=1");
-
- ActiveMQQueue dest = new ActiveMQQueue("TEST?consumer.dispatchAsync=false");
-
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri);
- cf.setDispatchAsync(false);
-
- // Create producer
- ActiveMQConnection producerConnection = (ActiveMQConnection) cf.createConnection();
- producerConnection.setMessagePrioritySupported(true);
- producerConnection.start();
- final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = producerSession.createProducer(dest);
-
- ActiveMQMessageConsumer consumer;
-
- // Create consumer on separate connection
- ActiveMQConnection consumerConnection = (ActiveMQConnection) cf.createConnection();
- consumerConnection.setMessagePrioritySupported(true);
- consumerConnection.start();
- final ActiveMQSession consumerSession = (ActiveMQSession) consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
- consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest);
-
- // Produce X number of messages with a session commit after each message
- Random random = new Random();
- for (int i = 0; i < messageCount; ++i) {
-
- Message message = producerSession.createTextMessage("Test message #" + i);
- producer.send(message, DeliveryMode.PERSISTENT, random.nextInt(10), 45 * 1000);
- producerSession.commit();
- }
- producer.close();
-
- // ***************************************************
- // If we create the consumer here instead of above, the
- // the messages will be consumed in priority order
- // ***************************************************
- //consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest);
-
- // Consume all of the messages we produce using a listener.
- // Don't exit until we get all the messages.
- final CountDownLatch latch = new CountDownLatch(messageCount);
- final StringBuffer failureMessage = new StringBuffer();
- consumer.setMessageListener(new MessageListener() {
- int lowestPrioritySeen = 10;
-
- boolean firstMessage = true;
-
- @Override
- public void onMessage(Message msg) {
- try {
-
- int currentPriority = msg.getJMSPriority();
- LOG.debug(currentPriority + "<=" + lowestPrioritySeen);
-
- // Ignore the first message priority since it is prefetched
- // and is out of order by design
- if (firstMessage == true) {
- firstMessage = false;
- LOG.debug("Ignoring first message since it was prefetched");
-
- }
- else {
-
- // Verify that we never see a priority higher than the
- // lowest
- // priority seen
- if (lowestPrioritySeen > currentPriority) {
- lowestPrioritySeen = currentPriority;
- }
- if (lowestPrioritySeen < currentPriority) {
- failureMessage.append("Incorrect priority seen (Lowest Priority = " + lowestPrioritySeen + " Current Priority = " + currentPriority + ")" + System.getProperty("line.separator"));
- }
- }
-
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- finally {
- latch.countDown();
- LOG.debug("Messages remaining = " + latch.getCount());
- }
- }
- });
-
- latch.await();
- consumer.close();
-
- // Cleanup producer resources
- producerSession.close();
- producerConnection.stop();
- producerConnection.close();
-
- // Cleanup consumer resources
- consumerSession.close();
- consumerConnection.stop();
- consumerConnection.close();
-
- // Report the failure if found
- if (failureMessage.length() > 0) {
- Assert.fail(failureMessage.toString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java
deleted file mode 100644
index d36faf9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3445Test {
-
- private ConnectionFactory connectionFactory;
- private BrokerService broker;
- private String connectionUri;
-
- private final String queueName = "Consumer.MyApp.VirtualTopic.FOO";
- private final String topicName = "VirtualTopic.FOO";
-
- @Before
- public void startBroker() throws Exception {
- createBroker(true);
- }
-
- private void createBroker(boolean deleteMessages) throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(deleteMessages);
- broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
- broker.setAdvisorySupport(false);
- broker.addConnector("tcp://0.0.0.0:0");
- broker.start();
- broker.waitUntilStarted();
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- }
-
- private void restartBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- createBroker(false);
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Test
- public void testJDBCRetiansDestinationAfterRestart() throws Exception {
-
- broker.getAdminView().addQueue(queueName);
- broker.getAdminView().addTopic(topicName);
-
- assertTrue(findDestination(queueName, false));
- assertTrue(findDestination(topicName, true));
-
- QueueViewMBean queue = getProxyToQueueViewMBean();
- assertEquals(0, queue.getQueueSize());
-
- restartBroker();
-
- assertTrue(findDestination(queueName, false));
- queue = getProxyToQueueViewMBean();
- assertEquals(0, queue.getQueueSize());
-
- sendMessage();
- restartBroker();
- assertTrue(findDestination(queueName, false));
-
- queue = getProxyToQueueViewMBean();
- assertEquals(1, queue.getQueueSize());
- sendMessage();
- assertEquals(2, queue.getQueueSize());
-
- restartBroker();
- assertTrue(findDestination(queueName, false));
- queue = getProxyToQueueViewMBean();
- assertEquals(2, queue.getQueueSize());
- }
-
- private void sendMessage() throws Exception {
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createTopic(topicName));
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- producer.send(session.createTextMessage("Testing"));
- producer.close();
- connection.close();
- }
-
- private QueueViewMBean getProxyToQueueViewMBean() throws Exception {
- ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":destinationType=Queue,destinationName=" + queueName + ",type=Broker,brokerName=localhost");
- QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
- return proxy;
- }
-
- private boolean findDestination(String name, boolean topic) throws Exception {
-
- ObjectName[] destinations;
-
- if (topic) {
- destinations = broker.getAdminView().getTopics();
- }
- else {
- destinations = broker.getAdminView().getQueues();
- }
-
- for (ObjectName destination : destinations) {
- if (destination.toString().contains(name)) {
- return true;
- }
- }
-
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java
deleted file mode 100644
index 96f0c2c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3454Test extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ3454Test.class);
- private static final int MESSAGES_COUNT = 10000;
-
- public void testSendWithLotsOfDestinations() throws Exception {
- final BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.setDeleteAllMessagesOnStartup(true);
-
- broker.addConnector("tcp://localhost:0");
-
- // populate a bunch of destinations, validate the impact on a call to send
- ActiveMQDestination[] destinations = new ActiveMQDestination[MESSAGES_COUNT];
- for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
- destinations[idx] = new ActiveMQQueue(getDestinationName() + "-" + idx);
- }
- broker.setDestinations(destinations);
- broker.start();
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
- final Connection connection = factory.createConnection();
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName()));
-
- long start = System.currentTimeMillis();
- for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
- Message message = session.createTextMessage("" + idx);
- producer.send(message);
- }
- LOG.info("Duration: " + (System.currentTimeMillis() - start) + " millis");
- producer.close();
- session.close();
-
- }
-
- protected String getDestinationName() {
- return getClass().getName() + "." + getName();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
deleted file mode 100644
index 5e6b2ff..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
-import javax.jms.XASession;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.ActiveMQXAConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3465Test {
-
- private final String xaDestinationName = "DestinationXA";
- private final String destinationName = "Destination";
- private BrokerService broker;
- private String connectionUri;
- private long txGenerator = System.currentTimeMillis();
-
- private XAConnectionFactory xaConnectionFactory;
- private ConnectionFactory connectionFactory;
-
- @Before
- public void startBroker() throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.addConnector("tcp://0.0.0.0:0");
- broker.start();
- broker.waitUntilStarted();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-
- connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- xaConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri);
- }
-
- @After
- public void stopBroker() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- @Test
- public void testMixedXAandNonXAorTXSessions() throws Exception {
-
- XAConnection xaConnection = xaConnectionFactory.createXAConnection();
- xaConnection.start();
- XASession session = xaConnection.createXASession();
- XAResource resource = session.getXAResource();
- Destination dest = new ActiveMQQueue(xaDestinationName);
-
- // publish a message
- Xid tid = createXid();
- resource.start(tid, XAResource.TMNOFLAGS);
- MessageProducer producer = session.createProducer(dest);
- ActiveMQTextMessage message = new ActiveMQTextMessage();
- message.setText("Some Text");
- producer.send(message);
- resource.end(tid, XAResource.TMSUCCESS);
- resource.commit(tid, true);
- session.close();
-
- session = xaConnection.createXASession();
- MessageConsumer consumer = session.createConsumer(dest);
- tid = createXid();
- resource = session.getXAResource();
- resource.start(tid, XAResource.TMNOFLAGS);
- TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
- assertNotNull(receivedMessage);
- assertEquals("Some Text", receivedMessage.getText());
- resource.end(tid, XAResource.TMSUCCESS);
-
- // Test that a normal session doesn't operate on XASession state.
- Connection connection2 = connectionFactory.createConnection();
- connection2.start();
- ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- if (session2.isTransacted()) {
- session2.rollback();
- }
-
- session2.close();
-
- resource.commit(tid, true);
- }
-
- @Test
- public void testMixedXAandNonXALocalTXSessions() throws Exception {
-
- XAConnection xaConnection = xaConnectionFactory.createXAConnection();
- xaConnection.start();
- XASession session = xaConnection.createXASession();
- XAResource resource = session.getXAResource();
- Destination dest = new ActiveMQQueue(xaDestinationName);
-
- // publish a message
- Xid tid = createXid();
- resource.start(tid, XAResource.TMNOFLAGS);
- MessageProducer producer = session.createProducer(dest);
- ActiveMQTextMessage message = new ActiveMQTextMessage();
- message.setText("Some Text");
- producer.send(message);
- resource.end(tid, XAResource.TMSUCCESS);
- resource.commit(tid, true);
- session.close();
-
- session = xaConnection.createXASession();
- MessageConsumer consumer = session.createConsumer(dest);
- tid = createXid();
- resource = session.getXAResource();
- resource.start(tid, XAResource.TMNOFLAGS);
- TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
- assertNotNull(receivedMessage);
- assertEquals("Some Text", receivedMessage.getText());
- resource.end(tid, XAResource.TMSUCCESS);
-
- // Test that a normal session doesn't operate on XASession state.
- Connection connection2 = connectionFactory.createConnection();
- connection2.start();
- ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Destination destination = new ActiveMQQueue(destinationName);
- ActiveMQMessageProducer producer2 = (ActiveMQMessageProducer) session2.createProducer(destination);
- producer2.send(session2.createTextMessage("Local-TX"));
-
- if (session2.isTransacted()) {
- session2.rollback();
- }
-
- session2.close();
-
- resource.commit(tid, true);
- }
-
- public Xid createXid() throws IOException {
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream os = new DataOutputStream(baos);
- os.writeLong(++txGenerator);
- os.close();
- final byte[] bs = baos.toByteArray();
-
- return new Xid() {
- @Override
- public int getFormatId() {
- return 86;
- }
-
- @Override
- public byte[] getGlobalTransactionId() {
- return bs;
- }
-
- @Override
- public byte[] getBranchQualifier() {
- return bs;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
deleted file mode 100644
index 3d9d2d4..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.activemq.bugs;
-
-import java.util.Properties;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3529Test {
-
- private static Logger LOG = LoggerFactory.getLogger(AMQ3529Test.class);
-
- private ConnectionFactory connectionFactory;
- private Connection connection;
- private Session session;
- private BrokerService broker;
- private String connectionUri;
- private MessageConsumer consumer;
- private Context ctx = null;
-
- @Before
- public void startBroker() throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.addConnector("tcp://0.0.0.0:0");
- broker.start();
- broker.waitUntilStarted();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-
- connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- }
-
- @After
- public void stopBroker() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- @Test(timeout = 60000)
- public void testInterruptionAffects() throws Exception {
- ThreadGroup tg = new ThreadGroup("tg");
-
- assertEquals(0, tg.activeCount());
-
- Thread client = new Thread(tg, "client") {
-
- @Override
- public void run() {
- try {
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- assertNotNull(session);
-
- Properties props = new Properties();
- props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- props.setProperty(Context.PROVIDER_URL, "tcp://0.0.0.0:0");
- ctx = null;
- try {
- ctx = new InitialContext(props);
- }
- catch (NoClassDefFoundError e) {
- throw new NamingException(e.toString());
- }
- catch (Exception e) {
- throw new NamingException(e.toString());
- }
- Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C");
- consumer = session.createConsumer(destination);
- consumer.receive(10000);
- }
- catch (Exception e) {
- // Expect an exception here from the interrupt.
- }
- finally {
- // next line is the nature of the test, if I remove this
- // line, everything works OK
- try {
- consumer.close();
- }
- catch (JMSException e) {
- fail("Consumer Close failed with" + e.getMessage());
- }
- try {
- session.close();
- }
- catch (JMSException e) {
- fail("Session Close failed with" + e.getMessage());
- }
- try {
- connection.close();
- }
- catch (JMSException e) {
- fail("Connection Close failed with" + e.getMessage());
- }
- try {
- ctx.close();
- }
- catch (Exception e) {
- fail("Connection Close failed with" + e.getMessage());
- }
- }
- }
- };
- client.start();
- Thread.sleep(5000);
- client.interrupt();
- client.join();
- Thread.sleep(2000);
- Thread[] remainThreads = new Thread[tg.activeCount()];
- tg.enumerate(remainThreads);
- for (Thread t : remainThreads) {
- if (t.isAlive() && !t.isDaemon())
- fail("Remaining thread: " + t.toString());
- }
-
- ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
- while (root.getParent() != null) {
- root = root.getParent();
- }
- visit(root, 0);
- }
-
- // This method recursively visits all thread groups under `group'.
- public static void visit(ThreadGroup group, int level) {
- // Get threads in `group'
- int numThreads = group.activeCount();
- Thread[] threads = new Thread[numThreads * 2];
- numThreads = group.enumerate(threads, false);
-
- // Enumerate each thread in `group'
- for (int i = 0; i < numThreads; i++) {
- // Get thread
- Thread thread = threads[i];
- LOG.debug("Thread:" + thread.getName() + " is still running");
- }
-
- // Get thread subgroups of `group'
- int numGroups = group.activeGroupCount();
- ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
- numGroups = group.enumerate(groups, false);
-
- // Recursively visit each subgroup
- for (int i = 0; i < numGroups; i++) {
- visit(groups[i], level + 1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java
deleted file mode 100644
index 614631f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Quick port to java to support AMQ build.
- *
- * This test demonstrates the classloader problem in the
- * ClassLoadingAwareObjectInputStream impl. If the first interface in the proxy
- * interfaces list is JDK and there are any subsequent interfaces that are NOT
- * JDK interfaces the ClassLoadingAwareObjectInputStream will ignore their
- * respective classloaders and cause the Proxy to throw an
- * IllegalArgumentException because the core JDK classloader can't load the
- * interfaces that are not JDK interfaces.
- *
- * See AMQ-3537
- *
- * @author jason.yankus
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class AMQ3537Test implements InvocationHandler, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * If the first and second element in this array are swapped, the test will
- * fail.
- */
- public static final Class[] TEST_CLASSES = new Class[]{List.class, NonJDKList.class, Serializable.class};
-
- /**
- * Underlying list
- */
- private final List l = new ArrayList<String>();
-
- @Before
- public void setUp() throws Exception {
- l.add("foo");
- }
-
- @Test
- public void testDeserializeProxy() throws Exception {
- // create the proxy
- List proxy = (List) java.lang.reflect.Proxy.newProxyInstance(this.getClass().getClassLoader(), TEST_CLASSES, this);
-
- // serialize it
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(proxy);
- byte[] serializedProxy = baos.toByteArray();
- oos.close();
- baos.close();
-
- // deserialize the proxy
- ClassLoadingAwareObjectInputStream claois = new ClassLoadingAwareObjectInputStream(new ByteArrayInputStream(serializedProxy));
-
- // this is where it fails due to the rudimentary classloader selection
- // in ClassLoadingAwareObjectInputStream
- List deserializedProxy = (List) claois.readObject();
-
- claois.close();
-
- // assert the invocation worked
- assertEquals("foo", deserializedProxy.get(0));
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- return method.invoke(l, args);
- }
-
- public interface NonJDKList {
-
- int size();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java
deleted file mode 100644
index c567eb3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.DefaultTestAppender;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Level;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author Claudio Corsi
- */
-public class AMQ3567Test {
-
- private static Logger logger = LoggerFactory.getLogger(AMQ3567Test.class);
-
- private ActiveMQConnectionFactory factory;
- private Connection connection;
- private Session sessionWithListener, session;
- private Queue destination;
- private MessageConsumer consumer;
- private Thread thread;
- private BrokerService broker;
- private String connectionUri;
-
- /**
- * @throws java.lang.Exception
- */
- @Before
- public void setUp() throws Exception {
- startBroker();
- initializeConsumer();
- startConsumer();
- }
-
- @Test
- public void runTest() throws Exception {
- produceSingleMessage();
- org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger("org.apache.activemq.util.ServiceSupport");
- final AtomicBoolean failed = new AtomicBoolean(false);
-
- Appender appender = new DefaultTestAppender() {
- @Override
- public void doAppend(LoggingEvent event) {
- if (event.getThrowableInformation() != null) {
- if (event.getThrowableInformation().getThrowable() instanceof InterruptedException) {
- InterruptedException ie = (InterruptedException) event.getThrowableInformation().getThrowable();
- if (ie.getMessage().startsWith("Could not stop service:")) {
- logger.info("Received an interrupted exception : ", ie);
- failed.set(true);
- }
- }
- }
- }
- };
- log4jLogger.addAppender(appender);
-
- Level level = log4jLogger.getLevel();
- log4jLogger.setLevel(Level.DEBUG);
-
- try {
- stopConsumer();
- stopBroker();
- if (failed.get()) {
- fail("An Interrupt exception was generated");
- }
-
- }
- finally {
- log4jLogger.setLevel(level);
- log4jLogger.removeAppender(appender);
- }
- }
-
- private void startBroker() throws Exception {
- broker = new BrokerService();
- broker.setDataDirectory("target/data");
- connectionUri = broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=30000&transport.closeAsync=false&transport.threadName&soTimeout=60000&transport.keepAlive=false&transport.useInactivityMonitor=false").getPublishableConnectString();
- broker.start(true);
- broker.waitUntilStarted();
- }
-
- private void stopBroker() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- private void initializeConsumer() throws JMSException {
- logger.info("Initializing the consumer messagor that will just not do anything....");
- factory = new ActiveMQConnectionFactory();
- factory.setBrokerURL("failover:(" + connectionUri + "?wireFormat.maxInactivityDuration=30000&keepAlive=true&soTimeout=60000)?jms.watchTopicAdvisories=false&jms.useAsyncSend=false&jms.dispatchAsync=true&jms.producerWindowSize=10485760&jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=true&InitialReconnectDelay=1000&maxReconnectDelay=10000&maxReconnectAttempts=400&useExponentialBackOff=true");
- connection = factory.createConnection();
- connection.start();
- sessionWithListener = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = sessionWithListener.createQueue("EMPTY.QUEUE");
- }
-
- private void startConsumer() throws Exception {
- logger.info("Starting the consumer");
- consumer = sessionWithListener.createConsumer(destination);
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- logger.info("Received a message: " + message);
- }
-
- });
-
- thread = new Thread(new Runnable() {
-
- private Session session;
-
- @Override
- public void run() {
- try {
- destination = session.createQueue("EMPTY.QUEUE");
- MessageConsumer consumer = session.createConsumer(destination);
- for (int cnt = 0; cnt < 2; cnt++) {
- Message message = consumer.receive(50000);
- logger.info("Received message: " + message);
- }
- }
- catch (JMSException e) {
- logger.debug("Received an exception while processing messages", e);
- }
- finally {
- try {
- session.close();
- }
- catch (JMSException e) {
- logger.debug("Received an exception while closing session", e);
- }
- }
- }
-
- public Runnable setSession(Session session) {
- this.session = session;
- return this;
- }
-
- }.setSession(session)) {
- {
- start();
- }
- };
- }
-
- private void stopConsumer() throws JMSException {
- logger.info("Stopping the consumer");
- try {
- thread.join();
- }
- catch (InterruptedException e) {
- logger.debug("Received an exception while waiting for thread to complete", e);
- }
- if (sessionWithListener != null) {
- sessionWithListener.close();
- }
- if (connection != null) {
- connection.stop();
- }
- }
-
- private void produceSingleMessage() throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
- factory.setBrokerURL(connectionUri);
- Connection connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("EMPTY.QUEUE");
- MessageProducer producer = session.createProducer(destination);
- producer.send(session.createTextMessage("Single Message"));
- producer.close();
- session.close();
- connection.close();
- }
-}