You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Archibald Whilshire <Ar...@gmx.net> on 2018/01/27 16:44:01 UTC

[ARTEMIS] A diverted message cannot be retried from DLQ

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.tests.integration.management;

import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.DayCounterInfo;
import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class QueueControlTest extends ManagementTestBase {

   private ActiveMQServer server;
   private ClientSession session;
   private ServerLocator locator;

   @Test
   public void testAttributes() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      SimpleString filter = new SimpleString("color = 'blue'");
      boolean durable = RandomUtil.randomBoolean();

      session.createQueue(address, queue, filter, durable);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(queue.toString(), queueControl.getName());
      Assert.assertEquals(address.toString(), queueControl.getAddress());
      Assert.assertEquals(filter.toString(), queueControl.getFilter());
      Assert.assertEquals(durable, queueControl.isDurable());
      Assert.assertEquals(false, queueControl.isTemporary());

      session.deleteQueue(queue);
   }

   @Test
   public void testGetNullFilter() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(queue.toString(), queueControl.getName());
      Assert.assertEquals(null, queueControl.getFilter());

      session.deleteQueue(queue);
   }

   @Test
   public void testGetDeadLetterAddress() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      final SimpleString deadLetterAddress = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertNull(queueControl.getDeadLetterAddress());

      server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings() {
         private static final long serialVersionUID = -4919035864731465338L;

         @Override
         public SimpleString getDeadLetterAddress() {
            return deadLetterAddress;
         }
      });

      Assert.assertEquals(deadLetterAddress.toString(), queueControl.getDeadLetterAddress());

      session.deleteQueue(queue);
   }

   @Test
   public void testSetDeadLetterAddress() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      String deadLetterAddress = RandomUtil.randomString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);

      AddressSettings addressSettings = new AddressSettings().setDeadLetterAddress(new SimpleString(deadLetterAddress));
      server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);

      Assert.assertEquals(deadLetterAddress, queueControl.getDeadLetterAddress());

      session.deleteQueue(queue);
   }

   @Test
   public void testGetExpiryAddress() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      final SimpleString expiryAddress = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertNull(queueControl.getExpiryAddress());

      server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings() {
         private static final long serialVersionUID = 6745306517827764680L;

         @Override
         public SimpleString getExpiryAddress() {
            return expiryAddress;
         }
      });

      Assert.assertEquals(expiryAddress.toString(), queueControl.getExpiryAddress());

      session.deleteQueue(queue);
   }

   @Test
   public void testSetExpiryAddress() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      String expiryAddress = RandomUtil.randomString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);

      AddressSettings addressSettings = new AddressSettings().setExpiryAddress(new SimpleString(expiryAddress));
      server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);

      Assert.assertEquals(expiryAddress, queueControl.getExpiryAddress());

      Queue serverqueue = server.locateQueue(queue);
      assertEquals(expiryAddress, serverqueue.getExpiryAddress().toString());

      session.deleteQueue(queue);
   }

   @Test
   public void testGetConsumerCount() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);

      Assert.assertEquals(0, queueControl.getConsumerCount());

      ClientConsumer consumer = session.createConsumer(queue);
      Assert.assertEquals(1, queueControl.getConsumerCount());

      consumer.close();
      Assert.assertEquals(0, queueControl.getConsumerCount());

      session.deleteQueue(queue);
   }

   @Test
   public void testGetConsumerJSON() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);

      Assert.assertEquals(0, queueControl.getConsumerCount());

      ClientConsumer consumer = session.createConsumer(queue);
      Assert.assertEquals(1, queueControl.getConsumerCount());

      System.out.println("Consumers: " + queueControl.listConsumersAsJSON());

      JsonArray obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());

      assertEquals(1, obj.size());

      consumer.close();
      Assert.assertEquals(0, queueControl.getConsumerCount());

      obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());

      assertEquals(0, obj.size());

      session.deleteQueue(queue);
   }

   @Test
   public void testGetMessageCount() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, getMessageCount(queueControl));

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));
      Assert.assertEquals(1, getMessageCount(queueControl));

      consumeMessages(1, session, queue);

      Assert.assertEquals(0, getMessageCount(queueControl));

      session.deleteQueue(queue);
   }

   @Test
   public void testGetFirstMessage() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, getMessageCount(queueControl));

      // It's empty, so it's supposed to be like this
      assertEquals("[{}]", queueControl.getFirstMessageAsJSON());

      long beforeSend = System.currentTimeMillis();
      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false).putStringProperty("x", "valueX").putStringProperty("y", "valueY"));

      System.out.println("first:" + queueControl.getFirstMessageAsJSON());

      long firstMessageTimestamp = queueControl.getFirstMessageTimestamp();
      System.out.println("first message timestamp: " + firstMessageTimestamp);
      assertTrue(beforeSend <= firstMessageTimestamp);
      assertTrue(firstMessageTimestamp <= System.currentTimeMillis());

      long firstMessageAge = queueControl.getFirstMessageAge();
      System.out.println("first message age: " + firstMessageAge);
      assertTrue(firstMessageAge <= (System.currentTimeMillis() - firstMessageTimestamp));

      session.deleteQueue(queue);
   }

   @Test
   public void testGetMessagesAdded() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, getMessagesAdded(queueControl));

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));
      Assert.assertEquals(1, getMessagesAdded(queueControl));
      producer.send(session.createMessage(false));
      Assert.assertEquals(2, getMessagesAdded(queueControl));

      consumeMessages(2, session, queue);

      Assert.assertEquals(2, getMessagesAdded(queueControl));

      session.deleteQueue(queue);
   }

   @Test
   public void testGetMessagesAcknowledged() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, queueControl.getMessagesAcknowledged());

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));
      consumeMessages(1, session, queue);
      Assert.assertEquals(1, queueControl.getMessagesAcknowledged());
      producer.send(session.createMessage(false));
      consumeMessages(1, session, queue);
      Assert.assertEquals(2, queueControl.getMessagesAcknowledged());

      //      ManagementTestBase.consumeMessages(2, session, queue);

      //      Assert.assertEquals(2, getMessagesAdded(queueControl));

      session.deleteQueue(queue);
   }

   @Test
   public void testGetScheduledCount() throws Exception {
      long delay = 500;
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, queueControl.getScheduledCount());

      ClientProducer producer = session.createProducer(address);
      ClientMessage message = session.createMessage(false);
      message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay);
      producer.send(message);

      long timeout = System.currentTimeMillis() + 5000;
      while (timeout > System.currentTimeMillis() && queueControl.getScheduledCount() != 1) {
         Thread.sleep(100);
      }

      Assert.assertEquals(1, queueControl.getScheduledCount());
      consumeMessages(0, session, queue);

      Thread.sleep(delay * 2);

      Assert.assertEquals(0, queueControl.getScheduledCount());
      consumeMessages(1, session, queue);

      session.deleteQueue(queue);
   }

   //https://issues.jboss.org/browse/HORNETQ-1231
   @Test
   public void testListDeliveringMessagesWithRASession() throws Exception {
      ServerLocator locator1 = createInVMNonHALocator().setBlockOnNonDurableSend(true).setConsumerWindowSize(10240).setAckBatchSize(0);
      ClientSessionFactory sf = locator1.createSessionFactory();
      final ClientSession transSession = sf.createSession(false, true, false);
      ClientConsumer consumer = null;
      SimpleString queue = null;
      int numMsg = 10;

      try {
         // a session from RA does this
         transSession.addMetaData("resource-adapter", "inbound");
         transSession.addMetaData("jms-session", "");

         SimpleString address = RandomUtil.randomSimpleString();
         queue = RandomUtil.randomSimpleString();

         transSession.createQueue(address, queue, null, false);

         final QueueControl queueControl = createManagementControl(address, queue);

         ClientProducer producer = transSession.createProducer(address);

         for (int i = 0; i < numMsg; i++) {
            ClientMessage message = transSession.createMessage(false);
            message.putIntProperty(new SimpleString("seqno"), i);
            producer.send(message);
         }

         consumer = transSession.createConsumer(queue);
         transSession.start();

         /**
          * the following latches are used to make sure that
          *
          * 1. the first call on queueControl happens after the
          * first message arrived at the message handler.
          *
          * 2. the message handler wait on the first message until
          * the queueControl returns the right/wrong result.
          *
          * 3. the test exits after all messages are received.
          *
          */
         final CountDownLatch latch1 = new CountDownLatch(1);
         final CountDownLatch latch2 = new CountDownLatch(1);
         final CountDownLatch latch3 = new CountDownLatch(10);

         consumer.setMessageHandler(new MessageHandler() {

            @Override
            public void onMessage(ClientMessage message) {
               try {
                  message.acknowledge();
               } catch (ActiveMQException e1) {
                  e1.printStackTrace();
               }
               latch1.countDown();
               try {
                  latch2.await(10, TimeUnit.SECONDS);
               } catch (InterruptedException e) {
                  e.printStackTrace();
               }
               latch3.countDown();
            }
         });

         latch1.await(10, TimeUnit.SECONDS);
         //now we know the ack of the message is sent but to make sure
         //the server has received it, we try 5 times
         int n = 0;
         for (int i = 0; i < 5; i++) {
            Thread.sleep(1000);
            String jsonStr = queueControl.listDeliveringMessagesAsJSON();

            n = countOccurrencesOf(jsonStr, "seqno");

            if (n == numMsg) {
               break;
            }
         }

         assertEquals(numMsg, n);

         latch2.countDown();

         latch3.await(10, TimeUnit.SECONDS);

         transSession.commit();
      } finally {
         consumer.close();
         transSession.deleteQueue(queue);
         transSession.close();
         locator1.close();
      }
   }

   @Test
   public void testListDeliveringMessages() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      int intValue = RandomUtil.randomInt();
      session.createQueue(address, queue, null, false);

      Queue srvqueue = server.locateQueue(queue);

      QueueControl queueControl = createManagementControl(address, queue);

      ClientProducer producer = session.createProducer(address);
      ClientMessage message = session.createMessage(false);
      message.putIntProperty(new SimpleString("key"), intValue);
      producer.send(message);
      producer.send(session.createMessage(false));

      ClientConsumer consumer = session.createConsumer(queue);
      session.start();
      ClientMessage msgRec = consumer.receive(5000);
      assertNotNull(msgRec);
      assertEquals(msgRec.getIntProperty("key").intValue(), intValue);

      ClientSessionFactory sf2 = createSessionFactory(locator);
      ClientSession session2 = sf2.createSession(false, true, false);
      ClientConsumer consumer2 = session2.createConsumer(queue);
      session2.start();
      ClientMessage msgRec2 = consumer2.receive(5000);
      assertNotNull(msgRec2);

      assertEquals(2, srvqueue.getDeliveringCount());
      assertEquals(2, srvqueue.getConsumerCount());

      System.out.println(queueControl.listDeliveringMessagesAsJSON());

      Map<String, Map<String, Object>[]> deliveringMap = queueControl.listDeliveringMessages();
      assertEquals(2, deliveringMap.size());

      consumer.close();
      consumer2.close();

      session.deleteQueue(queue);
   }

   @Test
   public void testListScheduledMessages() throws Exception {
      long delay = 2000;
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      int intValue = RandomUtil.randomInt();
      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);

      ClientProducer producer = session.createProducer(address);
      ClientMessage message = session.createMessage(false);
      message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay);
      message.putIntProperty(new SimpleString("key"), intValue);
      producer.send(message);
      // unscheduled message
      producer.send(session.createMessage(false));

      Map<String, Object>[] messages = queueControl.listScheduledMessages();
      Assert.assertEquals(1, messages.length);
      Assert.assertEquals(intValue, Integer.parseInt((messages[0].get("key")).toString()));

      Thread.sleep(delay + 500);

      messages = queueControl.listScheduledMessages();
      Assert.assertEquals(0, messages.length);

      consumeMessages(2, session, queue);

      session.deleteQueue(queue);
   }

   @Test
   public void testListScheduledMessagesAsJSON() throws Exception {
      long delay = 2000;
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      int intValue = RandomUtil.randomInt();
      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);

      ClientProducer producer = session.createProducer(address);
      ClientMessage message = session.createMessage(false);
      message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay);
      message.putIntProperty(new SimpleString("key"), intValue);
      producer.send(message);
      // unscheduled message
      producer.send(session.createMessage(false));

      String jsonString = queueControl.listScheduledMessagesAsJSON();
      Assert.assertNotNull(jsonString);
      JsonArray array = JsonUtil.readJsonArray(jsonString);
      Assert.assertEquals(1, array.size());
      int i = Integer.parseInt(array.getJsonObject(0).get("key").toString().replaceAll("\"", ""));
      Assert.assertEquals(intValue, i);

      Thread.sleep(delay + 500);

      jsonString = queueControl.listScheduledMessagesAsJSON();
      Assert.assertNotNull(jsonString);
      array = JsonUtil.readJsonArray(jsonString);
      Assert.assertEquals(0, array.size());

      consumeMessages(2, session, queue);

      session.deleteQueue(queue);
   }

   @Test
   public void testGetDeliveringCount() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, queueControl.getDeliveringCount());

      ClientConsumer consumer = session.createConsumer(queue);
      ClientMessage message = consumer.receive(500);
      Assert.assertNotNull(message);
      Assert.assertEquals(1, queueControl.getDeliveringCount());

      message.acknowledge();
      session.commit();
      Assert.assertEquals(0, queueControl.getDeliveringCount());

      consumer.close();
      session.deleteQueue(queue);
   }

   @Test
   public void testMessagesAddedAndMessagesAcknowledged() throws Exception {
      final int THREAD_COUNT = 5;
      final int MSG_COUNT = 1000;

      CountDownLatch producerCountDown = new CountDownLatch(THREAD_COUNT);
      CountDownLatch consumerCountDown = new CountDownLatch(THREAD_COUNT);

      ExecutorService producerExecutor = Executors.newFixedThreadPool(THREAD_COUNT);
      ExecutorService consumerExecutor = Executors.newFixedThreadPool(THREAD_COUNT);

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      try {
         session.createQueue(address, RoutingType.ANYCAST, queue, null, false);

         for (int i = 0; i < THREAD_COUNT; i++) {
            producerExecutor.submit(() -> {
               try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientProducer producer = session.createProducer(address)) {
                  for (int j = 0; j < MSG_COUNT; j++) {
                     producer.send(session.createMessage(false));
                     Thread.sleep(5);
                  }
                  producerCountDown.countDown();
               } catch (Exception e) {
                  e.printStackTrace();
               }
            });
         }

         for (int i = 0; i < THREAD_COUNT; i++) {
            consumerExecutor.submit(() -> {
               try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientConsumer consumer = session.createConsumer(queue)) {
                  session.start();
                  for (int j = 0; j < MSG_COUNT; j++) {
                     ClientMessage message = consumer.receive(500);
                     Assert.assertNotNull(message);
                     message.acknowledge();
                  }
                  session.commit();
                  consumerCountDown.countDown();
               } catch (Exception e) {
                  e.printStackTrace();
               }
            });
         }

         producerCountDown.await(30, TimeUnit.SECONDS);
         consumerCountDown.await(30, TimeUnit.SECONDS);

         QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
         Assert.assertEquals(0, queueControl.getMessageCount());
         Assert.assertEquals(0, queueControl.getConsumerCount());
         Assert.assertEquals(0, queueControl.getDeliveringCount());
         Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAdded());
         Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAcknowledged());

         session.deleteQueue(queue);
      } finally {
         shutdownExecutor(producerExecutor);
         shutdownExecutor(consumerExecutor);
      }
   }

   private void shutdownExecutor(ExecutorService executor) {
      try {
         executor.shutdown();
         executor.awaitTermination(5, TimeUnit.SECONDS);
      } catch (InterruptedException e) {
      } finally {
         executor.shutdownNow();
      }
   }

   @Test
   public void testListMessagesAsJSONWithNullFilter() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      int intValue = RandomUtil.randomInt();
      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);

      ClientProducer producer = session.createProducer(address);
      ClientMessage message = session.createMessage(false);
      message.putIntProperty(new SimpleString("key"), intValue);
      producer.send(message);

      String jsonString = queueControl.listMessagesAsJSON(null);
      Assert.assertNotNull(jsonString);
      JsonArray array = JsonUtil.readJsonArray(jsonString);
      Assert.assertEquals(1, array.size());

      long l = Long.parseLong(array.getJsonObject(0).get("key").toString().replaceAll("\"", ""));
      Assert.assertEquals(intValue, l);

      consumeMessages(1, session, queue);

      jsonString = queueControl.listMessagesAsJSON(null);
      Assert.assertNotNull(jsonString);
      array = JsonUtil.readJsonArray(jsonString);
      Assert.assertEquals(0, array.size());

      session.deleteQueue(queue);
   }

   @Test
   public void testListMessagesWithFilter() throws Exception {
      SimpleString key = new SimpleString("key");
      long matchingValue = RandomUtil.randomLong();
      long unmatchingValue = matchingValue + 1;
      String filter = key + " =" + matchingValue;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      ClientProducer producer = session.createProducer(address);
      ClientMessage matchingMessage = session.createMessage(false);
      matchingMessage.putLongProperty(key, matchingValue);
      producer.send(matchingMessage);
      ClientMessage unmatchingMessage = session.createMessage(false);
      unmatchingMessage.putLongProperty(key, unmatchingValue);
      producer.send(unmatchingMessage);

      Map<String, Object>[] messages = queueControl.listMessages(filter);
      Assert.assertEquals(1, messages.length);
      Assert.assertEquals(matchingValue, Long.parseLong(messages[0].get("key").toString()));

      consumeMessages(2, session, queue);

      messages = queueControl.listMessages(filter);
      Assert.assertEquals(0, messages.length);

      session.deleteQueue(queue);
   }

   @Test
   public void testListMessagesWithNullFilter() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));
      producer.send(session.createMessage(false));

      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(2, messages.length);

      consumeMessages(2, session, queue);

      messages = queueControl.listMessages(null);
      Assert.assertEquals(0, messages.length);

      session.deleteQueue(queue);
   }

   @Test
   public void testListMessagesWithEmptyFilter() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));
      producer.send(session.createMessage(false));

      Map<String, Object>[] messages = queueControl.listMessages("");
      Assert.assertEquals(2, messages.length);

      consumeMessages(2, session, queue);

      messages = queueControl.listMessages("");
      Assert.assertEquals(0, messages.length);

      session.deleteQueue(queue);
   }

   @Test
   public void testListMessagesAsJSONWithFilter() throws Exception {
      SimpleString key = new SimpleString("key");
      long matchingValue = RandomUtil.randomLong();
      long unmatchingValue = matchingValue + 1;
      String filter = key + " =" + matchingValue;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      ClientProducer producer = session.createProducer(address);
      ClientMessage matchingMessage = session.createMessage(false);
      matchingMessage.putLongProperty(key, matchingValue);
      producer.send(matchingMessage);
      ClientMessage unmatchingMessage = session.createMessage(false);
      unmatchingMessage.putLongProperty(key, unmatchingValue);
      producer.send(unmatchingMessage);

      String jsonString = queueControl.listMessagesAsJSON(filter);
      Assert.assertNotNull(jsonString);
      JsonArray array = JsonUtil.readJsonArray(jsonString);
      Assert.assertEquals(1, array.size());

      long l = Long.parseLong(array.getJsonObject(0).get("key").toString().replaceAll("\"", ""));
      Assert.assertEquals(matchingValue, l);

      consumeMessages(2, session, queue);

      jsonString = queueControl.listMessagesAsJSON(filter);
      Assert.assertNotNull(jsonString);
      array = JsonUtil.readJsonArray(jsonString);
      Assert.assertEquals(0, array.size());

      session.deleteQueue(queue);
   }

   /**
    * Test retry - get a message from DLQ and put on original queue.
    */
   @Test
   public void testRetryMessage() throws Exception {
      final SimpleString dla = new SimpleString("DLA");
      final SimpleString qName = new SimpleString("q1");
      final SimpleString adName = new SimpleString("ad1");
      final SimpleString dlq = new SimpleString("DLQ1");
      final String sampleText = "Put me on DLQ";

      AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
      server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);

      session.createQueue(dla, dlq, null, false);
      session.createQueue(adName, qName, null, false);

      // Send message to queue.
      ClientProducer producer = session.createProducer(adName);
      producer.send(createTextMessage(session, sampleText));
      session.start();

      ClientConsumer clientConsumer = session.createConsumer(qName);
      ClientMessage clientMessage = clientConsumer.receive(500);
      clientMessage.acknowledge();
      Assert.assertNotNull(clientMessage);

      Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);

      // force a rollback to DLQ
      session.rollback();
      clientMessage = clientConsumer.receiveImmediate();
      Assert.assertNull(clientMessage);

      QueueControl queueControl = createManagementControl(dla, dlq);
      Assert.assertEquals(1, getMessageCount(queueControl));
      final long messageID = getFirstMessageId(queueControl);

      // Retry the message - i.e. it should go from DLQ to original Queue.
      Assert.assertTrue(queueControl.retryMessage(messageID));

      // Assert DLQ is empty...
      Assert.assertEquals(0, getMessageCount(queueControl));

      // .. and that the message is now on the original queue once more.
      clientMessage = clientConsumer.receive(500);
      clientMessage.acknowledge();
      Assert.assertNotNull(clientMessage);

      Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());

      clientConsumer.close();
   }

   /**
    * Test retry - get a diverted message from DLQ and put on original queue.
    */
   @Test
   public void testRetryDivertedMessage() throws Exception {
      final SimpleString dla = new SimpleString("DLA");
      final SimpleString dlq = new SimpleString("DLQ1");
      final SimpleString qName = new SimpleString("q1");
      final SimpleString adName = new SimpleString("ad1");
      final SimpleString topicName = new SimpleString("TA1");
      final String sampleText = "Put me on DLQ";

      AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
      server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);

      // create target queue, DLQ and source topic
      session.createQueue(dla, dlq, null, false);
      session.createQueue(adName, qName, null, false);
      session.createAddress(topicName, RoutingType.MULTICAST, false);
      
      DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
    		  .setRoutingName("some-name").setAddress("TA1")
    		  .setForwardingAddress("ad1").setExclusive(false);
      server.deployDivert(divert);
      
      // Send message to topic.
      ClientProducer producer = session.createProducer(topicName);
      producer.send(createTextMessage(session, sampleText));
      session.start();

      ClientConsumer clientConsumer = session.createConsumer(qName);
      ClientMessage clientMessage = clientConsumer.receive(500);
      clientMessage.acknowledge();
      Assert.assertNotNull(clientMessage);

      Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);

      // force a rollback to DLQ
      session.rollback();
      clientMessage = clientConsumer.receiveImmediate();
      Assert.assertNull(clientMessage);

      QueueControl queueControl = createManagementControl(dla, dlq);
      Assert.assertEquals(1, getMessageCount(queueControl));
      final long messageID = getFirstMessageId(queueControl);

      // Retry the message - i.e. it should go from DLQ to original Queue.
      Assert.assertTrue(queueControl.retryMessage(messageID));

      // Assert DLQ is empty...
      Assert.assertEquals(0, getMessageCount(queueControl));

      // .. and that the message is now on the original queue once more.
      clientMessage = clientConsumer.receive(500);
      Assert.assertNotNull(clientMessage); // fails because of AMQ222196 !!!
      clientMessage.acknowledge();

      Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());

      clientConsumer.close();
   }
   
   /**
    * Test retry multiple messages from  DLQ to original queue.
    */
   @Test
   public void testRetryMultipleMessages() throws Exception {
      final SimpleString dla = new SimpleString("DLA");
      final SimpleString qName = new SimpleString("q1");
      final SimpleString adName = new SimpleString("ad1");
      final SimpleString dlq = new SimpleString("DLQ1");
      final String sampleText = "Put me on DLQ";
      final int numMessagesToTest = 10;

      AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
      server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);

      session.createQueue(dla, dlq, null, false);
      session.createQueue(adName, qName, null, false);

      // Send message to queue.
      ClientProducer producer = session.createProducer(adName);
      for (int i = 0; i < numMessagesToTest; i++) {
         producer.send(createTextMessage(session, sampleText));
      }

      session.start();

      // Read and rollback all messages to DLQ
      ClientConsumer clientConsumer = session.createConsumer(qName);
      for (int i = 0; i < numMessagesToTest; i++) {
         ClientMessage clientMessage = clientConsumer.receive(500);
         clientMessage.acknowledge();
         Assert.assertNotNull(clientMessage);
         Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
         session.rollback();
      }

      Assert.assertNull(clientConsumer.receiveImmediate());

      QueueControl dlqQueueControl = createManagementControl(dla, dlq);
      Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));

      // Retry all messages - i.e. they should go from DLQ to original Queue.
      Assert.assertEquals(numMessagesToTest, dlqQueueControl.retryMessages());

      // Assert DLQ is empty...
      Assert.assertEquals(0, getMessageCount(dlqQueueControl));

      // .. and that the messages is now on the original queue once more.
      for (int i = 0; i < numMessagesToTest; i++) {
         ClientMessage clientMessage = clientConsumer.receive(500);
         clientMessage.acknowledge();
         Assert.assertNotNull(clientMessage);
         Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
      }

      clientConsumer.close();
   }

   /**
    * <ol>
    * <li>send a message to queue</li>
    * <li>move all messages from queue to otherQueue using management method</li>
    * <li>check there is no message to consume from queue</li>
    * <li>consume the message from otherQueue</li>
    * </ol>
    */
   @Test
   public void testMoveMessages() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      SimpleString otherAddress = RandomUtil.randomSimpleString();
      SimpleString otherQueue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      session.createQueue(otherAddress, otherQueue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      ClientMessage message = session.createMessage(false);
      SimpleString key = RandomUtil.randomSimpleString();
      long value = RandomUtil.randomLong();
      message.putLongProperty(key, value);
      producer.send(message);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // moved all messages to otherQueue
      int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString());
      Assert.assertEquals(1, movedMessagesCount);
      Assert.assertEquals(0, getMessageCount(queueControl));

      // check there is no message to consume from queue
      consumeMessages(0, session, queue);

      // consume the message from otherQueue
      ClientConsumer otherConsumer = session.createConsumer(otherQueue);
      ClientMessage m = otherConsumer.receive(500);
      Assert.assertEquals(value, m.getObjectProperty(key));

      m.acknowledge();

      session.deleteQueue(queue);
      otherConsumer.close();
      session.deleteQueue(otherQueue);
   }

   @Test
   public void testMoveMessages2() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queueA = new SimpleString("A");
      SimpleString queueB = new SimpleString("B");
      SimpleString queueC = new SimpleString("C");

      server.createQueue(address, RoutingType.MULTICAST, queueA, null, true, false);
      server.createQueue(address, RoutingType.MULTICAST, queueB, null, true, false);
      server.createQueue(address, RoutingType.MULTICAST, queueC, null, true, false);


      QueueControl queueControlA = createManagementControl(address, queueA);
      QueueControl queueControlB = createManagementControl(address, queueB);
      QueueControl queueControlC = createManagementControl(address, queueC);

      // send two messages on queueA

      queueControlA.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
      queueControlA.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody2".getBytes()), true, "myUser", "myPassword");

      Assert.assertEquals(2, getMessageCount(queueControlA));
      Assert.assertEquals(0, getMessageCount(queueControlB));
      Assert.assertEquals(0, getMessageCount(queueControlC));

      // move 2 messages from queueA to queueB
      queueControlA.moveMessages(null, queueB.toString());
      Thread.sleep(500);
      Assert.assertEquals(0, getMessageCount(queueControlA));
      Assert.assertEquals(2, getMessageCount(queueControlB));

      // move 1 message to queueC
      queueControlA.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody3".getBytes()), true, "myUser", "myPassword");
      Assert.assertEquals(1, getMessageCount(queueControlA));
      queueControlA.moveMessages(null, queueC.toString());
      Assert.assertEquals(1, getMessageCount(queueControlC));
      Assert.assertEquals(0, getMessageCount(queueControlA));

      //move all messages back to A
      queueControlB.moveMessages(null, queueA.toString());
      Assert.assertEquals(2, getMessageCount(queueControlA));
      Assert.assertEquals(0, getMessageCount(queueControlB));

      queueControlC.moveMessages(null, queueA.toString());
      Assert.assertEquals(3, getMessageCount(queueControlA));
      Assert.assertEquals(0, getMessageCount(queueControlC));

      // consume the message from queueA
      ClientConsumer consumer = session.createConsumer(queueA);
      ClientMessage m1 = consumer.receive(500);
      ClientMessage m2 = consumer.receive(500);
      ClientMessage m3 = consumer.receive(500);

      m1.acknowledge();
      m2.acknowledge();
      m3.acknowledge();

      consumer.close();
      session.deleteQueue(queueA);
      session.deleteQueue(queueB);
      session.deleteQueue(queueC);

   }

   @Test
   public void testMoveMessagesToUnknownQueue() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      SimpleString unknownQueue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      ClientMessage message = session.createMessage(false);
      SimpleString key = RandomUtil.randomSimpleString();
      long value = RandomUtil.randomLong();
      message.putLongProperty(key, value);
      producer.send(message);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // moved all messages to unknown queue
      try {
         queueControl.moveMessages(null, unknownQueue.toString());
         Assert.fail("operation must fail if the other queue does not exist");
      } catch (Exception e) {
      }
      Assert.assertEquals(1, getMessageCount(queueControl));

      consumeMessages(1, session, queue);

      session.deleteQueue(queue);
   }

   /**
    * <ol>
    * <li>send 2 message to queue</li>
    * <li>move messages from queue to otherQueue using management method <em>with filter</em></li>
    * <li>consume the message which <strong>did not</strong> matches the filter from queue</li>
    * <li>consume the message which <strong>did</strong> matches the filter from otherQueue</li>
    * </ol>
    */

   @Test
   public void testMoveMessagesWithFilter() throws Exception {
      SimpleString key = new SimpleString("key");
      long matchingValue = RandomUtil.randomLong();
      long unmatchingValue = matchingValue + 1;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      SimpleString otherAddress = RandomUtil.randomSimpleString();
      SimpleString otherQueue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      session.createQueue(otherAddress, otherQueue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      ClientMessage matchingMessage = session.createMessage(false);
      matchingMessage.putLongProperty(key, matchingValue);
      producer.send(matchingMessage);
      ClientMessage unmatchingMessage = session.createMessage(false);
      unmatchingMessage.putLongProperty(key, unmatchingValue);
      producer.send(unmatchingMessage);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      // moved matching messages to otherQueue
      int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue, otherQueue.toString());
      Assert.assertEquals(1, movedMatchedMessagesCount);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // consume the unmatched message from queue
      ClientConsumer consumer = session.createConsumer(queue);
      ClientMessage m = consumer.receive(500);
      Assert.assertNotNull(m);
      Assert.assertEquals(unmatchingValue, m.getObjectProperty(key));

      // consume the matched message from otherQueue
      ClientConsumer otherConsumer = session.createConsumer(otherQueue);
      m = otherConsumer.receive(500);
      Assert.assertNotNull(m);
      Assert.assertEquals(matchingValue, m.getObjectProperty(key));

      m.acknowledge();

      consumer.close();
      session.deleteQueue(queue);
      otherConsumer.close();
      session.deleteQueue(otherQueue);
   }

   @Test
   public void testMoveMessage() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      SimpleString otherAddress = RandomUtil.randomSimpleString();
      SimpleString otherQueue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      session.createQueue(otherAddress, otherQueue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send 2 messages on queue
      producer.send(session.createMessage(false));
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);
      Assert.assertEquals(2, getMessageCount(queueControl));
      Assert.assertEquals(0, getMessageCount(otherQueueControl));

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(2, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      boolean moved = queueControl.moveMessage(messageID, otherQueue.toString());
      Assert.assertTrue(moved);
      Assert.assertEquals(1, getMessageCount(queueControl));
      Assert.assertEquals(1, getMessageCount(otherQueueControl));

      consumeMessages(1, session, queue);
      consumeMessages(1, session, otherQueue);

      session.deleteQueue(queue);
      session.deleteQueue(otherQueue);
   }

   @Test
   public void testMoveMessageToUnknownQueue() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      SimpleString unknownQueue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send 2 messages on queue
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(1, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      // moved all messages to unknown queue
      try {
         queueControl.moveMessage(messageID, unknownQueue.toString());
         Assert.fail("operation must fail if the other queue does not exist");
      } catch (Exception e) {
      }
      Assert.assertEquals(1, getMessageCount(queueControl));

      consumeMessages(1, session, queue);

      session.deleteQueue(queue);
   }

   /**
    * <ol>
    * <li>send 2 message to queue</li>
    * <li>remove messages from queue using management method <em>with filter</em></li>
    * <li>check there is only one message to consume from queue</li>
    * </ol>
    */

   @Test
   public void testRemoveMessages() throws Exception {
      SimpleString key = new SimpleString("key");
      long matchingValue = RandomUtil.randomLong();
      long unmatchingValue = matchingValue + 1;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      ClientMessage matchingMessage = session.createMessage(false);
      matchingMessage.putLongProperty(key, matchingValue);
      producer.send(matchingMessage);
      ClientMessage unmatchingMessage = session.createMessage(false);
      unmatchingMessage.putLongProperty(key, unmatchingValue);
      producer.send(unmatchingMessage);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      // removed matching messages to otherQueue
      int removedMatchedMessagesCount = queueControl.removeMessages(key + " =" + matchingValue);
      Assert.assertEquals(1, removedMatchedMessagesCount);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // consume the unmatched message from queue
      ClientConsumer consumer = session.createConsumer(queue);
      ClientMessage m = consumer.receive(500);
      Assert.assertNotNull(m);
      Assert.assertEquals(unmatchingValue, m.getObjectProperty(key));

      m.acknowledge();

      // check there is no other message to consume:
      m = consumer.receiveImmediate();
      Assert.assertNull(m);

      consumer.close();
      session.deleteQueue(queue);
   }

   @Test
   public void testRemoveMessagesWithLimit() throws Exception {
      SimpleString key = new SimpleString("key");
      long matchingValue = RandomUtil.randomLong();
      long unmatchingValue = matchingValue + 1;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      ClientMessage matchingMessage = session.createMessage(false);
      matchingMessage.putLongProperty(key, matchingValue);
      producer.send(matchingMessage);
      ClientMessage unmatchingMessage = session.createMessage(false);
      unmatchingMessage.putLongProperty(key, unmatchingValue);
      producer.send(unmatchingMessage);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      // removed matching messages to otherQueue
      int removedMatchedMessagesCount = queueControl.removeMessages(5, key + " =" + matchingValue);
      Assert.assertEquals(1, removedMatchedMessagesCount);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // consume the unmatched message from queue
      ClientConsumer consumer = session.createConsumer(queue);
      ClientMessage m = consumer.receive(500);
      Assert.assertNotNull(m);
      Assert.assertEquals(unmatchingValue, m.getObjectProperty(key));

      m.acknowledge();

      // check there is no other message to consume:
      m = consumer.receiveImmediate();
      Assert.assertNull(m);

      consumer.close();
      session.deleteQueue(queue);
   }

   @Test
   public void testRemoveMessagesWithNullFilter() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      producer.send(session.createMessage(false));
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      // removed matching messages to otherQueue
      int removedMatchedMessagesCount = queueControl.removeMessages(null);
      Assert.assertEquals(2, removedMatchedMessagesCount);
      Assert.assertEquals(0, getMessageCount(queueControl));

      session.deleteQueue(queue);
   }

   @Test
   public void testRemoveAllMessages() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      producer.send(session.createMessage(false));
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      // removed matching messages to otherQueue
      int removedMatchedMessagesCount = queueControl.removeAllMessages();
      Assert.assertEquals(2, removedMatchedMessagesCount);
      Assert.assertEquals(0, getMessageCount(queueControl));

      session.deleteQueue(queue);
   }

   @Test
   public void testRemoveMessagesWithEmptyFilter() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      producer.send(session.createMessage(false));
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      // removed matching messages to otherQueue
      int removedMatchedMessagesCount = queueControl.removeMessages("");
      Assert.assertEquals(2, removedMatchedMessagesCount);
      Assert.assertEquals(0, getMessageCount(queueControl));

      session.deleteQueue(queue);
   }

   @Test
   public void testRemoveMessage() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send 2 messages on queue
      producer.send(session.createMessage(false));
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(2, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      // delete 1st message
      boolean deleted = queueControl.removeMessage(messageID);
      Assert.assertTrue(deleted);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // check there is a single message to consume from queue
      consumeMessages(1, session, queue);

      session.deleteQueue(queue);
   }

   @Test
   public void testRemoveScheduledMessage() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send 2 messages on queue, both scheduled
      long timeout = System.currentTimeMillis() + 5000;
      ClientMessage m1 = session.createMessage(true);
      m1.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout);
      producer.send(m1);
      ClientMessage m2 = session.createMessage(true);
      m2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout);
      producer.send(m2);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, queueControl.getScheduledCount());

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listScheduledMessages();
      Assert.assertEquals(2, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      // delete 1st message
      boolean deleted = queueControl.removeMessage(messageID);
      Assert.assertTrue(deleted);
      Assert.assertEquals(1, queueControl.getScheduledCount());

      // check there is a single message to consume from queue
      while (timeout > System.currentTimeMillis() && queueControl.getScheduledCount() == 1) {
         Thread.sleep(100);
      }

      consumeMessages(1, session, queue);

      session.deleteQueue(queue);
   }

   @Test
   public void testRemoveMessage2() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send messages on queue

      for (int i = 0; i < 100; i++) {

         ClientMessage msg = session.createMessage(false);
         msg.putIntProperty("count", i);
         producer.send(msg);
      }

      ClientConsumer cons = session.createConsumer(queue);
      session.start();
      LinkedList<ClientMessage> msgs = new LinkedList<>();
      for (int i = 0; i < 50; i++) {
         ClientMessage msg = cons.receive(1000);
         msgs.add(msg);
      }

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(100, getMessageCount(queueControl));

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(50, messages.length);
      int i = Integer.parseInt((messages[0].get("count")).toString());
      assertEquals(50, i);
      long messageID = (Long) messages[0].get("messageID");

      // delete 1st message
      boolean deleted = queueControl.removeMessage(messageID);
      Assert.assertTrue(deleted);
      Assert.assertEquals(99, getMessageCount(queueControl));

      cons.close();

      // check there is a single message to consume from queue
      consumeMessages(99, session, queue);

      session.deleteQueue(queue);
   }

   @Test
   public void testCountMessagesWithFilter() throws Exception {
      SimpleString key = new SimpleString("key");
      long matchingValue = RandomUtil.randomLong();
      long unmatchingValue = matchingValue + 1;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      ClientMessage matchingMessage = session.createMessage(false);
      matchingMessage.putLongProperty(key, matchingValue);
      ClientMessage unmatchingMessage = session.createMessage(false);
      unmatchingMessage.putLongProperty(key, unmatchingValue);
      producer.send(matchingMessage);
      producer.send(unmatchingMessage);
      producer.send(matchingMessage);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(3, getMessageCount(queueControl));

      Assert.assertEquals(2, queueControl.countMessages(key + " =" + matchingValue));
      Assert.assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue));

      session.deleteQueue(queue);
   }

   @Test
   public void testCountMessagesWithInvalidFilter() throws Exception {
      SimpleString key = new SimpleString("key");
      String matchingValue = "MATCH";
      String nonMatchingValue = "DIFFERENT";

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      for (int i = 0; i < 100; i++) {
         ClientMessage msg = session.createMessage(false);
         msg.putStringProperty(key, SimpleString.toSimpleString(matchingValue));
         producer.send(msg);
      }

      for (int i = 0; i < 10; i++) {
         ClientMessage msg = session.createMessage(false);
         msg.putStringProperty(key, SimpleString.toSimpleString(nonMatchingValue));
         producer.send(msg);
      }

      // this is just to guarantee a round trip and avoid in transit messages, so they are all accounted for
      session.commit();

      ClientConsumer consumer = session.createConsumer(queue, SimpleString.toSimpleString("nonExistentProperty like \'%Temp/88\'"));

      session.start();

      assertNull(consumer.receiveImmediate());

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(110, getMessageCount(queueControl));

      Assert.assertEquals(0, queueControl.countMessages("nonExistentProperty like \'%Temp/88\'"));

      Assert.assertEquals(100, queueControl.countMessages(key + "=\'" + matchingValue + "\'"));
      Assert.assertEquals(10, queueControl.countMessages(key + " = \'" + nonMatchingValue + "\'"));

      consumer.close();

      session.deleteQueue(queue);
   }

   @Test
   public void testExpireMessagesWithFilter() throws Exception {
      SimpleString key = new SimpleString("key");
      long matchingValue = RandomUtil.randomLong();
      long unmatchingValue = matchingValue + 1;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      ClientMessage matchingMessage = session.createMessage(false);
      matchingMessage.putLongProperty(key, matchingValue);
      producer.send(matchingMessage);
      ClientMessage unmatchingMessage = session.createMessage(false);
      unmatchingMessage.putLongProperty(key, unmatchingValue);
      producer.send(unmatchingMessage);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      int expiredMessagesCount = queueControl.expireMessages(key + " =" + matchingValue);
      Assert.assertEquals(1, expiredMessagesCount);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // consume the unmatched message from queue
      ClientConsumer consumer = session.createConsumer(queue);
      ClientMessage m = consumer.receive(500);
      Assert.assertNotNull(m);
      Assert.assertEquals(unmatchingValue, m.getObjectProperty(key));

      m.acknowledge();

      // check there is no other message to consume:
      m = consumer.receiveImmediate();
      Assert.assertNull(m);

      consumer.close();
      session.deleteQueue(queue);
      session.close();
   }

   @Test
   public void testExpireMessage() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      SimpleString expiryAddress = RandomUtil.randomSimpleString();
      SimpleString expiryQueue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      session.createQueue(expiryAddress, expiryQueue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send on queue
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      QueueControl expiryQueueControl = createManagementControl(expiryAddress, expiryQueue);
      Assert.assertEquals(1, getMessageCount(queueControl));
      Assert.assertEquals(0, getMessageCount(expiryQueueControl));

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(1, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      AddressSettings addressSettings = new AddressSettings().setExpiryAddress(expiryAddress);
      server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);

      boolean expired = queueControl.expireMessage(messageID);
      Assert.assertTrue(expired);
      Assert.assertEquals(0, getMessageCount(queueControl));
      Assert.assertEquals(1, getMessageCount(expiryQueueControl));

      consumeMessages(0, session, queue);
      consumeMessages(1, session, expiryQueue);

      session.deleteQueue(queue);
      session.deleteQueue(expiryQueue);
      session.close();
   }

   @Test
   public void testSendMessageToDeadLetterAddress() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();
      SimpleString deadLetterAddress = RandomUtil.randomSimpleString();
      SimpleString deadLetterQueue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      session.createQueue(deadLetterAddress, deadLetterQueue, null, false);
      ClientProducer producer = session.createProducer(address);

      // send 2 messages on queue
      producer.send(session.createMessage(false));
      producer.send(session.createMessage(false));

      QueueControl queueControl = createManagementControl(address, queue);
      QueueControl deadLetterQueueControl = createManagementControl(deadLetterAddress, deadLetterQueue);
      Assert.assertEquals(2, getMessageCount(queueControl));

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(2, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      AddressSettings addressSettings = new AddressSettings().setDeadLetterAddress(deadLetterAddress);
      server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);

      Assert.assertEquals(0, getMessageCount(deadLetterQueueControl));
      boolean movedToDeadLetterAddress = queueControl.sendMessageToDeadLetterAddress(messageID);
      Assert.assertTrue(movedToDeadLetterAddress);
      Assert.assertEquals(1, getMessageCount(queueControl));
      Assert.assertEquals(1, getMessageCount(deadLetterQueueControl));

      // check there is a single message to consume from queue
      consumeMessages(1, session, queue);

      // check there is a single message to consume from deadletter queue
      consumeMessages(1, session, deadLetterQueue);

      session.deleteQueue(queue);
      session.deleteQueue(deadLetterQueue);
   }

   @Test
   public void testChangeMessagePriority() throws Exception {
      byte originalPriority = (byte) 1;
      byte newPriority = (byte) 8;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      ClientMessage message = session.createMessage(false);
      message.setPriority(originalPriority);
      producer.send(message);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(1, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      boolean priorityChanged = queueControl.changeMessagePriority(messageID, newPriority);
      Assert.assertTrue(priorityChanged);

      ClientConsumer consumer = session.createConsumer(queue);
      ClientMessage m = consumer.receive(500);
      Assert.assertNotNull(m);
      Assert.assertEquals(newPriority, m.getPriority());

      consumer.close();
      session.deleteQueue(queue);
   }

   @Test
   public void testChangeMessagePriorityWithInvalidValue() throws Exception {
      byte invalidPriority = (byte) 23;

      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      ClientProducer producer = session.createProducer(address);

      ClientMessage message = session.createMessage(false);
      producer.send(message);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(1, getMessageCount(queueControl));

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(1, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      try {
         queueControl.changeMessagePriority(messageID, invalidPriority);
         Assert.fail("operation fails when priority value is < 0 or > 9");
      } catch (Exception e) {
      }

      ClientConsumer consumer = session.createConsumer(queue);
      ClientMessage m = consumer.receive(500);
      Assert.assertNotNull(m);
      Assert.assertTrue(invalidPriority != m.getPriority());

      consumer.close();
      session.deleteQueue(queue);
   }

   @Test
   public void testListMessageCounter() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
      serverControl.enableMessageCounters();
      serverControl.setMessageCounterSamplePeriod(100);

      String jsonString = queueControl.listMessageCounter();
      MessageCounterInfo info = MessageCounterInfo.fromJSON(jsonString);

      Assert.assertEquals(0, info.getDepth());
      Assert.assertEquals(0, info.getCount());

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));

      Thread.sleep(200);
      jsonString = queueControl.listMessageCounter();
      info = MessageCounterInfo.fromJSON(jsonString);
      Assert.assertEquals(1, info.getDepth());
      Assert.assertEquals(1, info.getDepthDelta());
      Assert.assertEquals(1, info.getCount());
      Assert.assertEquals(1, info.getCountDelta());

      producer.send(session.createMessage(false));

      Thread.sleep(200);
      jsonString = queueControl.listMessageCounter();
      info = MessageCounterInfo.fromJSON(jsonString);
      Assert.assertEquals(2, info.getDepth());
      Assert.assertEquals(1, info.getDepthDelta());
      Assert.assertEquals(2, info.getCount());
      Assert.assertEquals(1, info.getCountDelta());

      consumeMessages(2, session, queue);

      Thread.sleep(200);
      jsonString = queueControl.listMessageCounter();
      info = MessageCounterInfo.fromJSON(jsonString);
      Assert.assertEquals(0, info.getDepth());
      Assert.assertEquals(-2, info.getDepthDelta());
      Assert.assertEquals(2, info.getCount());
      Assert.assertEquals(0, info.getCountDelta());

      session.deleteQueue(queue);
   }

   @Test
   public void testResetMessageCounter() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
      serverControl.enableMessageCounters();
      serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);

      String jsonString = queueControl.listMessageCounter();
      MessageCounterInfo info = MessageCounterInfo.fromJSON(jsonString);

      Assert.assertEquals(0, info.getDepth());
      Assert.assertEquals(0, info.getCount());

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));

      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
      jsonString = queueControl.listMessageCounter();
      info = MessageCounterInfo.fromJSON(jsonString);
      Assert.assertEquals(1, info.getDepth());
      Assert.assertEquals(1, info.getDepthDelta());
      Assert.assertEquals(1, info.getCount());
      Assert.assertEquals(1, info.getCountDelta());

      consumeMessages(1, session, queue);

      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
      jsonString = queueControl.listMessageCounter();
      info = MessageCounterInfo.fromJSON(jsonString);
      Assert.assertEquals(0, info.getDepth());
      Assert.assertEquals(-1, info.getDepthDelta());
      Assert.assertEquals(1, info.getCount());
      Assert.assertEquals(0, info.getCountDelta());

      queueControl.resetMessageCounter();

      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
      jsonString = queueControl.listMessageCounter();
      info = MessageCounterInfo.fromJSON(jsonString);
      Assert.assertEquals(0, info.getDepth());
      Assert.assertEquals(0, info.getDepthDelta());
      Assert.assertEquals(0, info.getCount());
      Assert.assertEquals(0, info.getCountDelta());

      session.deleteQueue(queue);
   }

   @Test
   public void testListMessageCounterAsHTML() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      String history = queueControl.listMessageCounterAsHTML();
      Assert.assertNotNull(history);

      session.deleteQueue(queue);
   }

   @Test
   public void testListMessageCounterHistory() throws Exception {
      long counterPeriod = 1000;
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
      serverControl.enableMessageCounters();
      serverControl.setMessageCounterSamplePeriod(counterPeriod);

      String jsonString = queueControl.listMessageCounterHistory();
      DayCounterInfo[] infos = DayCounterInfo.fromJSON(jsonString);
      Assert.assertEquals(1, infos.length);

      session.deleteQueue(queue);
   }

   @Test
   public void testListMessageCounterHistoryAsHTML() throws Exception {
      long counterPeriod = 1000;
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);
      QueueControl queueControl = createManagementControl(address, queue);

      ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
      serverControl.enableMessageCounters();
      serverControl.setMessageCounterSamplePeriod(counterPeriod);

      String history = queueControl.listMessageCounterHistoryAsHTML();
      Assert.assertNotNull(history);

      session.deleteQueue(queue);

   }

   @Test
   public void testMoveMessagesBack() throws Exception {
      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false);
      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false);

      ServerLocator locator = createInVMNonHALocator();

      ClientSessionFactory sf = createSessionFactory(locator);

      ClientSession session = sf.createSession(true, true);

      ClientProducer prod1 = session.createProducer("q1");

      for (int i = 0; i < 10; i++) {
         ClientMessage msg = session.createMessage(true);

         msg.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));

         prod1.send(msg);
      }

      session.commit();

      ClientConsumer consumer = session.createConsumer("q1", true);
      session.start();

      assertNotNull(consumer.receive(5000));
      consumer.close();

      QueueControl q1Control = ManagementControlHelper.createQueueControl(new SimpleString("q1"), new SimpleString("q1"), mbeanServer);

      QueueControl q2Control = ManagementControlHelper.createQueueControl(new SimpleString("q2"), new SimpleString("q2"), mbeanServer);

      assertEquals(10, q1Control.moveMessages(null, "q2"));

      consumer = session.createConsumer("q2", true);

      assertNotNull(consumer.receive(500));

      consumer.close();

      q2Control.moveMessages(null, "q1", false);

      session.start();
      consumer = session.createConsumer("q1");

      for (int i = 0; i < 10; i++) {
         ClientMessage msg = consumer.receive(5000);
         assertNotNull(msg);
         msg.acknowledge();
      }

      consumer.close();

      session.deleteQueue("q1");

      session.deleteQueue("q2");

      session.close();

      locator.close();

   }

   @Test
   public void testMoveMessagesBack2() throws Exception {
      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false);
      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false);

      ServerLocator locator = createInVMNonHALocator();

      ClientSessionFactory sf = createSessionFactory(locator);

      ClientSession session = sf.createSession(true, true);

      ClientProducer prod1 = session.createProducer("q1");

      int NUMBER_OF_MSGS = 10;

      for (int i = 0; i < NUMBER_OF_MSGS; i++) {
         ClientMessage msg = session.createMessage(true);

         msg.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));

         prod1.send(msg);
      }

      session.commit();

      ClientConsumer consumer = session.createConsumer("q1", true);
      session.start();

      assertNotNull(consumer.receive(5000));
      consumer.close();

      QueueControl q1Control = ManagementControlHelper.createQueueControl(new SimpleString("q1"), new SimpleString("q1"), mbeanServer);

      QueueControl q2Control = ManagementControlHelper.createQueueControl(new SimpleString("q2"), new SimpleString("q2"), mbeanServer);

      assertEquals(NUMBER_OF_MSGS, q1Control.moveMessages(null, "q2"));

      long[] messageIDs = new long[NUMBER_OF_MSGS];

      consumer = session.createConsumer("q2", true);

      for (int i = 0; i < NUMBER_OF_MSGS; i++) {
         ClientMessage msg = consumer.receive(5000);
         assertNotNull(msg);
         messageIDs[i] = msg.getMessageID();
      }

      assertNull(consumer.receiveImmediate());

      consumer.close();

      for (int i = 0; i < NUMBER_OF_MSGS; i++) {
         q2Control.moveMessage(messageIDs[i], "q1");
      }

      session.start();
      consumer = session.createConsumer("q1");

      for (int i = 0; i < NUMBER_OF_MSGS; i++) {
         ClientMessage msg = consumer.receive(5000);
         assertNotNull(msg);
         msg.acknowledge();
      }

      consumer.close();

      session.deleteQueue("q1");

      session.deleteQueue("q2");

      session.close();
   }

   @Test
   public void testPauseAndResume() {
      long counterPeriod = 1000;
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      try {
         session.createQueue(address, queue, null, false);
         QueueControl queueControl = createManagementControl(address, queue);

         ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
         serverControl.enableMessageCounters();
         serverControl.setMessageCounterSamplePeriod(counterPeriod);
         Assert.assertFalse(queueControl.isPaused());
         queueControl.pause();
         Assert.assertTrue(queueControl.isPaused());
         queueControl.resume();
         Assert.assertFalse(queueControl.isPaused());
      } catch (Exception e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
      }
   }

   @Test
   public void testResetMessagesAdded() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, getMessagesAdded(queueControl));

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));
      Assert.assertEquals(1, getMessagesAdded(queueControl));
      producer.send(session.createMessage(false));
      Assert.assertEquals(2, getMessagesAdded(queueControl));

      consumeMessages(2, session, queue);

      Assert.assertEquals(2, getMessagesAdded(queueControl));

      queueControl.resetMessagesAdded();

      Assert.assertEquals(0, getMessagesAdded(queueControl));

      session.deleteQueue(queue);
   }

   @Test
   public void testResetMessagesAcknowledged() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, queueControl.getMessagesAcknowledged());

      ClientProducer producer = session.createProducer(address);
      producer.send(session.createMessage(false));
      consumeMessages(1, session, queue);
      Assert.assertEquals(1, queueControl.getMessagesAcknowledged());
      producer.send(session.createMessage(false));
      consumeMessages(1, session, queue);
      Assert.assertEquals(2, queueControl.getMessagesAcknowledged());

      queueControl.resetMessagesAcknowledged();

      Assert.assertEquals(0, queueControl.getMessagesAcknowledged());

      session.deleteQueue(queue);
   }

   @Test
   public void testResetMessagesExpired() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, queueControl.getMessagesExpired());

      ClientProducer producer = session.createProducer(address);
      ClientMessage message = session.createMessage(false);
      producer.send(message);

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(1, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      queueControl.expireMessage(messageID);
      Assert.assertEquals(1, queueControl.getMessagesExpired());

      message = session.createMessage(false);
      producer.send(message);

      // the message IDs are set on the server
      messages = queueControl.listMessages(null);
      Assert.assertEquals(1, messages.length);
      messageID = (Long) messages[0].get("messageID");

      queueControl.expireMessage(messageID);
      Assert.assertEquals(2, queueControl.getMessagesExpired());

      queueControl.resetMessagesExpired();

      Assert.assertEquals(0, queueControl.getMessagesExpired());

      session.deleteQueue(queue);
   }

   @Test
   public void testResetMessagesKilled() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);
      Assert.assertEquals(0, queueControl.getMessagesExpired());

      ClientProducer producer = session.createProducer(address);
      ClientMessage message = session.createMessage(false);
      producer.send(message);

      // the message IDs are set on the server
      Map<String, Object>[] messages = queueControl.listMessages(null);
      Assert.assertEquals(1, messages.length);
      long messageID = (Long) messages[0].get("messageID");

      queueControl.sendMessageToDeadLetterAddress(messageID);
      Assert.assertEquals(1, queueControl.getMessagesKilled());

      message = session.createMessage(false);
      producer.send(message);

      // send to DLA the old-fashioned way
      ClientConsumer consumer = session.createConsumer(queue);
      for (int i = 0; i < server.getAddressSettingsRepository().getMatch(queue.toString()).getMaxDeliveryAttempts(); i++) {
         message = consumer.receive(500);
         assertNotNull(message);
         message.acknowledge();
         session.rollback();
      }

      consumer.close();

      Assert.assertEquals(2, queueControl.getMessagesKilled());

      queueControl.resetMessagesKilled();

      Assert.assertEquals(0, queueControl.getMessagesKilled());

      session.deleteQueue(queue);
   }

   //make sure notifications are always received no matter whether
   //a Queue is created via QueueControl or by JMSServerManager directly.
   @Test
   public void testCreateQueueNotification() throws Exception {
      JMSUtil.JMXListener listener = new JMSUtil.JMXListener();
      this.mbeanServer.addNotificationListener(ObjectNameBuilder.DEFAULT.getActiveMQServerObjectName(), listener, null, null);

      SimpleString testQueueName = new SimpleString("newQueue");
      String testQueueName2 = "newQueue2";
      this.server.createQueue(testQueueName, RoutingType.ANYCAST, testQueueName, null, false, false);

      Notification notif = listener.getNotification();

      System.out.println("got notif: " + notif);
      assertEquals(CoreNotificationType.BINDING_ADDED.toString(), notif.getType());

      this.server.destroyQueue(testQueueName);

      notif = listener.getNotification();
      System.out.println("got notif: " + notif);
      assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), notif.getType());

      ActiveMQServerControl control = ManagementControlHelper.createActiveMQServerControl(mbeanServer);

      control.createQueue(testQueueName2, testQueueName2);

      notif = listener.getNotification();
      System.out.println("got notif: " + notif);
      assertEquals(CoreNotificationType.BINDING_ADDED.toString(), notif.getType());

      control.destroyQueue(testQueueName2);

      notif = listener.getNotification();
      System.out.println("got notif: " + notif);
      assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), notif.getType());
   }

   @Test
   public void testSendMessage() throws Exception {
      SimpleString address = RandomUtil.randomSimpleString();
      SimpleString queue = RandomUtil.randomSimpleString();

      session.createQueue(address, queue, null, false);

      QueueControl queueControl = createManagementControl(address, queue);

      queueControl.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
      queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");

      Assert.assertEquals(2, getMessageCount(queueControl));

      // the message IDs are set on the server
      CompositeData[] browse = queueControl.browse(null);

      Assert.assertEquals(2, browse.length);

      byte[] body = (byte[]) browse[0].get(BODY);

      Assert.assertNotNull(body);

      Assert.assertEquals(new String(body), "theBody");

      body = (byte[]) browse[1].get(BODY);

      Assert.assertNotNull(body);

      Assert.assertEquals(new String(body), "theBody");
   }

   // Package protected ---------------------------------------------

   // Protected -----------------------------------------------------

   @Override
   @Before
   public void setUp() throws Exception {
      super.setUp();
      Configuration conf = createDefaultInVMConfig().setJMXManagementEnabled(true);
      server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, false));

      server.start();

      locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setConsumerWindowSize(0);
      ClientSessionFactory sf = createSessionFactory(locator);
      session = sf.createSession(false, true, false);
      session.start();
   }

   protected long getFirstMessageId(final QueueControl queueControl) throws Exception {
      JsonArray array = JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
      JsonObject object = (JsonObject) array.get(0);
      return object.getJsonNumber("messageID").longValue();
   }
}

Re: [ARTEMIS] A diverted message cannot be retried from DLQ

Posted by Archibald <ar...@gmx.net>.
See also: https://issues.apache.org/jira/browse/ARTEMIS-1645.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: [ARTEMIS] A diverted message cannot be retried from DLQ

Posted by Archibald <ar...@gmx.net>.
Yes, I stumbled across this issue running 2.4.0. But wrote a test against
master.

Br, A.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: [ARTEMIS] A diverted message cannot be retried from DLQ

Posted by mbukosky <mb...@paypal.com>.
Do you know off hand if this issue is also in version 2.4?



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: [ARTEMIS] A diverted message cannot be retried from DLQ

Posted by Archibald <ar...@gmx.net>.
Before someone asks: I ran the test against current master.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: [ARTEMIS] A diverted message cannot be retried from DLQ

Posted by Archibald <ar...@gmx.net>.
Hmm, don't know why the original mail message text is no being posted here
but only the attachment,
but anyway here it is again:

Given a topic SOURCE and a divert which forwards a message M to a queue
TARGET.
Consumer fails to process M and M is being send to DLQ.
 
If you now retry M from DLQ it is not send to TARGET but you'll get
AMQ222196: Could not find binding ...
And even worse the message is lost afterwards...
 
My suspecion is, that the message properties are not correct regarding
_AMQ_ORIG_ADDRESS and _AMQ_ORIG_QUEUE.
Is: _AMQ_ORIG_ADDRESS=<Address from SOURCE>, _AMQ_ORIG_QUEUE=TARGET
Should be:  _AMQ_ORIG_ADDRESS=<Address FROM TARGET>, _AMQ_ORIG_QUEUE=TARGET
 
Attached you'll find a testcase "testRetryDivertedMessage" which
demonstrates the problem.
 
Can this be fixed? a) Retry should be possible. b) If not do not remove
message.

Br, Archibald



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html