You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2022/10/12 12:41:09 UTC

[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4183: ARTEMIS-3875 - adding consumer and producer metrics

gemmellr commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r981040625


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java:
##########
@@ -441,6 +446,84 @@ public void testGetConsumerJSON() throws Exception {
       session.deleteQueue(queue);
    }
 
+   @Test
+   public void testGetConsumerWithMessagesJSON() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      ClientProducer producer = session.createProducer(address);
+
+      for (int i = 0; i < 10; i++) {
+         producer.send(session.createMessage(true));
+      }
+
+      Wait.assertEquals(0, () -> queueControl.getConsumerCount());
+
+      ClientConsumer consumer = session.createConsumer(queue);
+      Wait.assertEquals(1, () -> queueControl.getConsumerCount());
+
+      session.start();
+
+      ClientMessage clientMessage = null;
+
+      int size = 0;
+      for (int i = 0; i < 5; i++) {
+         clientMessage = consumer.receiveImmediate();
+         size += clientMessage.getEncodeSize();
+      }
+
+      JsonArray obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
+
+      assertEquals(1, obj.size());
+
+      Wait.assertEquals(5, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
+
+      obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
+
+      JsonObject jsonObject = obj.get(0).asJsonObject();
+
+      assertEquals(5, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
+
+      assertEquals(size, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
+
+      assertEquals(5, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED.getName()));
+
+      assertEquals(size, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()));
+
+      assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
+
+      //we cant assume an elapseed time to only checking for its existence
+      assertNotNull(jsonObject.getInt(ConsumerField.LAST_DELIVERED_TIME.getName()));
+
+      assertEquals( 0, jsonObject.getInt(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()));

Review Comment:
   If these are going to be a timestamp they can be asserted to be a value within a fairly specific delta during the test itself, and also their values relative to each other. E.g ensure they are in flight, then a further 1ms delay before acking would mean the 2 values also have to differ by at least that much, but be after the point the test started, and you can then also put a narrow delta for the ack time value by using the time you called it + allowance.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -2342,8 +2331,8 @@ public Pair<SimpleString, EnumSet<RoutingType>> getAddressAndRoutingTypes(Simple
    @Override
    public void addProducer(ServerProducer serverProducer) {
       serverProducer.setSessionID(getName());
-      serverProducer.setConnectionID(getConnectionID().toString());
-      producers.put(serverProducer.getID(), serverProducer);
+      serverProducer.setConnectionID(getConnectionID() != null ? getConnectionID().toString() : null);
+      producers.put(serverProducer.getAddress(), serverProducer);

Review Comment:
   This makes the addition use getAddress() however the removeProducer() method below it is still using the 'ID' value which will likely mismatch the add+remove operations and so could lead to retention.
   
   Also, by using getAddress() here, it might mean the producer addition here may not align with the producer lookup later in doSend for an anonymous producer (and perhaps also producer to FQQN?), meaning a new address-specific 'synthetic producer' value could get created in doSend() per-address and leading to multiple ServerProducerImpl entries for a single actual producer, throwing off the producer count and also making the metric values wrong for the initial actual producer (since the metrics would be assigned to the 'synthetic duplicates' per-address). The change in ActiveMQPacketHandler.java would presumably have been to stop [part of] this problem manifesting for Core.
   
   If that issue didnt exist, two producers coming in with the same address would potentially cause the metrics to that point to be discarded.
   
   



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -1958,14 +1931,29 @@ public StorageManager getStorageManager() {
 
    @Override
    public void describeProducersInfo(JsonArrayBuilder array) throws Exception {
-      Map<SimpleString, Pair<Object, AtomicLong>> targetCopy = cloneTargetAddresses();
-
-      for (Map.Entry<SimpleString, Pair<Object, AtomicLong>> entry : targetCopy.entrySet()) {
+      Map<String, ServerProducer> targetCopy = cloneProducers();
+      String sessionClientID = getRemotingConnection().getClientID();
+      String localAddress = getRemotingConnection().getTransportConnection().getLocalAddress();
+      String remoteAddress = getRemotingConnection().getTransportConnection().getRemoteAddress();
+      for (Map.Entry<String, ServerProducer> entry : targetCopy.entrySet()) {
          String uuid = null;
-         if (entry.getValue().getA() != null) {
-            uuid = entry.getValue().getA().toString();
+         if (entry.getValue().getUserID() != null) {
+            uuid = entry.getValue().getUserID().toString();
          }
-         JsonObjectBuilder producerInfo = JsonLoader.createObjectBuilder().add("connectionID", this.getConnectionID().toString()).add("sessionID", this.getName()).add("destination", entry.getKey().toString()).add("lastUUIDSent", uuid, JsonValue.NULL).add("msgSent", entry.getValue().getB().longValue());
+         JsonObjectBuilder producerInfo = JsonLoader.createObjectBuilder()
+               .add(ProducerField.ID.getName(), getName())
+               .add(ProducerField.CONNECTION_ID.getName(), this.getConnectionID().toString())
+               .add(ProducerField.SESSION.getAlternativeName(), this.getName())
+               .add(ProducerField.CLIENT_ID.getName(), sessionClientID != null ? sessionClientID : "")
+               .add(ProducerField.USER.getName(), getUsername() != null ? getUsername() : "")
+               .add(ProducerField.PROTOCOL.getName(), remotingConnection.getProtocolName())
+               .add(ProducerField.LOCAL_ADDRESS.getName(), localAddress != null ? localAddress : "")
+               .add(ProducerField.REMOTE_ADDRESS.getName(), remoteAddress != null ? remoteAddress : "")

Review Comment:
   These seem superfluous given they are session or connection details, which should respectively have all these already.
   
   It seems consumer bits also do this sometimes, but it only seems to be the 'ConsumerView' adding these details for certain management stuff, the apparent equivalent of this producer method for the consumers in ActiveMQServerControlImpl.listConsumersAsJSON / toJSONObject(ServerConsumer) doesnt add these connection/session details the rest of the time. I think matching that behaviour by only having 'ProducerView' add them in similar cases would make sense.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -2769,27 +2775,554 @@ public void testListAllConsumersAsJSON() throws Exception {
       JsonObject first = sorted[0];
       JsonObject second = sorted[1];
 
-      Assert.assertTrue(first.getJsonNumber("creationTime").longValue() > 0);
-      Assert.assertNotNull(first.getJsonNumber("consumerID").longValue());
-      Assert.assertTrue(first.getString("connectionID").length() > 0);
-      Assert.assertEquals(factory.getConnection().getID().toString(), first.getString("connectionID"));
-      Assert.assertTrue(first.getString("sessionID").length() > 0);
-      Assert.assertEquals(((ClientSessionImpl) session).getName(), first.getString("sessionID"));
-      Assert.assertTrue(first.getString("queueName").length() > 0);
-      Assert.assertEquals(queueName.toString(), first.getString("queueName"));
-      Assert.assertEquals(false, first.getBoolean("browseOnly"));
-      Assert.assertEquals(0, first.getJsonNumber("deliveringCount").longValue());
+      Assert.assertTrue(first.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue() > 0);
+      Assert.assertNotNull(first.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+      Assert.assertTrue(first.getString(ConsumerField.CONNECTION.getAlternativeName()).length() > 0);
+      Assert.assertEquals(factory.getConnection().getID().toString(), first.getString(ConsumerField.CONNECTION.getAlternativeName()));
+      Assert.assertTrue(first.getString(ConsumerField.SESSION.getAlternativeName()).length() > 0);
+      Assert.assertEquals(((ClientSessionImpl) session).getName(), first.getString(ConsumerField.SESSION.getAlternativeName()));
+      Assert.assertTrue(first.getString(ConsumerField.QUEUE.getAlternativeName()).length() > 0);
+      Assert.assertEquals(queueName.toString(), first.getString(ConsumerField.QUEUE.getAlternativeName()));
+      Assert.assertEquals(false, first.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
+
+      Assert.assertTrue(second.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue() > 0);
+      Assert.assertNotNull(second.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+      Assert.assertTrue(second.getString(ConsumerField.CONNECTION.getAlternativeName()).length() > 0);
+      Assert.assertEquals(factory2.getConnection().getID().toString(), second.getString(ConsumerField.CONNECTION.getAlternativeName()));
+      Assert.assertTrue(second.getString(ConsumerField.SESSION.getAlternativeName()).length() > 0);
+      Assert.assertEquals(((ClientSessionImpl) session2).getName(), second.getString(ConsumerField.SESSION.getAlternativeName()));
+      Assert.assertTrue(second.getString(ConsumerField.QUEUE.getAlternativeName()).length() > 0);
+      Assert.assertEquals(queueName.toString(), second.getString(ConsumerField.QUEUE.getAlternativeName()));
+      Assert.assertEquals(false, second.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
 
-      Assert.assertTrue(second.getJsonNumber("creationTime").longValue() > 0);
-      Assert.assertNotNull(second.getJsonNumber("consumerID").longValue());
-      Assert.assertTrue(second.getString("connectionID").length() > 0);
-      Assert.assertEquals(factory2.getConnection().getID().toString(), second.getString("connectionID"));
-      Assert.assertTrue(second.getString("sessionID").length() > 0);
-      Assert.assertEquals(((ClientSessionImpl) session2).getName(), second.getString("sessionID"));
-      Assert.assertTrue(second.getString("queueName").length() > 0);
-      Assert.assertEquals(queueName.toString(), second.getString("queueName"));
-      Assert.assertEquals(false, second.getBoolean("browseOnly"));
-      Assert.assertEquals(0, second.getJsonNumber("deliveringCount").longValue());
+   }
+
+
+   @Test
+   public void testListAllConsumersAsJSONTXCommit() throws Exception {
+      SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(true,false, 1);
+      addClientSession(session);
+
+      serverControl.createAddress(queueName.toString(), RoutingType.ANYCAST.name());
+      if (legacyCreateQueue) {
+         server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
+      } else {
+         server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, null, 100, -1, false);
+      addClientConsumer(consumer);
+      session.start();
+      Thread.sleep(200);

Review Comment:
   Necessary? Repeat comment for other tests below



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -1185,7 +1182,7 @@ public void deleteQueue(final SimpleString queueToDelete) throws Exception {
       }
 
       if (server.getAddressInfo(unPrefixedQueueName) == null) {
-         targetAddressInfos.remove(queueToDelete);
+         producers.remove(queueToDelete.toString());

Review Comment:
   The addition to the map uses the message or producer address rather than the queue name this removal does, what happens when these differ? (Thinking mainly topic sub queues, but maybe even FQQN usage).
   
   Also the actual queue being removed was the one from "unPrefixedQueueName" variable, while this isnt and may be different..is that a factor?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -4023,6 +4565,114 @@ public void testListProducers() throws Exception {
       }
    }
 
+   @Test
+   public void testListProducersMessageCounts() throws Exception {
+      SimpleString queueName1 = new SimpleString("my_queue_one");
+      SimpleString addressName1 = new SimpleString("my_address_one");
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
+      if (legacyCreateQueue) {
+         server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, false, false);
+      } else {
+         server.createQueue(new QueueConfiguration(queueName1).setAddress(addressName1).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      int numMessages = 10;
+
+
+      // create some consumers
+      try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator);) {
+
+         ClientSession session1 = csf.createSession();
+
+         ClientProducer producer1 = session1.createProducer(addressName1);
+         int messagesSize = 0;
+         for (int i = 0; i < numMessages; i++) {
+            ClientMessage message = session1.createMessage(true);
+            producer1.send(message);
+            messagesSize += message.getEncodeSize();
+         }
+         //bring back all producers
+         String filterString = createJsonFilter("", "", "");
+         String producersAsJsonString = serverControl.listProducers(filterString, 1, 50);
+         JsonObject producersAsJsonObject = JsonUtil.readJsonObject(producersAsJsonString);
+         JsonArray array = (JsonArray) producersAsJsonObject.get("data");
+
+         Assert.assertEquals("number of producers returned from query", 1, array.size());
+
+         JsonObject jsonSession = array.getJsonObject(0);
+
+         //check all fields
+         Assert.assertNotEquals(ProducerField.ID.getName(), "", jsonSession.getString(ProducerField.ID.getName()));
+         Assert.assertNotEquals(ProducerField.SESSION.getName(), "", jsonSession.getString(ProducerField.SESSION.getName()));
+         Assert.assertEquals(ProducerField.CLIENT_ID.getName(), "", jsonSession.getString(ProducerField.CLIENT_ID.getName()));
+         Assert.assertEquals(ProducerField.USER.getName(), "", jsonSession.getString(ProducerField.USER.getName()));
+         Assert.assertNotEquals(ProducerField.PROTOCOL.getAlternativeName(), "", jsonSession.getString(ProducerField.PROTOCOL.getName()));
+         Assert.assertEquals(ProducerField.ADDRESS.getName(), addressName1.toString(), jsonSession.getString(ProducerField.ADDRESS.getName()));
+         Assert.assertNotEquals(ProducerField.LOCAL_ADDRESS.getName(), "", jsonSession.getString(ProducerField.LOCAL_ADDRESS.getName()));
+         Assert.assertNotEquals(ProducerField.REMOTE_ADDRESS.getName(), "", jsonSession.getString(ProducerField.REMOTE_ADDRESS.getName()));
+         Assert.assertNotEquals(ProducerField.CREATION_TIME.getName(), "", jsonSession.getString(ProducerField.CREATION_TIME.getName()));
+         Assert.assertEquals(ProducerField.MESSAGE_SENT.getName(), numMessages, jsonSession.getInt(ProducerField.MESSAGE_SENT.getName()));
+         Assert.assertEquals(ProducerField.MESSAGE_SENT_SIZE.getName(), messagesSize, jsonSession.getInt(ProducerField.MESSAGE_SENT_SIZE.getName()));
+         Assert.assertEquals(ProducerField.LAST_UUID_SENT.getName(), "", jsonSession.getString(ProducerField.LAST_UUID_SENT.getName()));
+      }
+   }
+
+   @Test
+   public void testListProducersMessageCounts2() throws Exception {
+      SimpleString queueName1 = new SimpleString("my_queue_one");
+      SimpleString addressName1 = new SimpleString("my_address_one");
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
+      if (legacyCreateQueue) {
+         server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, false, false);
+      } else {
+         server.createQueue(new QueueConfiguration(queueName1).setAddress(addressName1).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      int numMessages = 10;
+
+
+      // create some consumers
+      try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator);) {
+
+         ClientSession session1 = csf.createSession();
+
+         ClientProducer producer1 = session1.createProducer(addressName1);
+         int messagesSize = 0;
+         for (int i = 0; i < numMessages; i++) {
+            ClientMessage message = session1.createMessage(true);
+            producer1.send(message);
+            messagesSize += message.getEncodeSize();
+         }
+         //bring back all producers
+         String producersAsJsonString = serverControl.listProducersInfoAsJSON();
+         JsonArray jsonArray = JsonUtil.readJsonArray(producersAsJsonString);
+
+         JsonObject jsonSession = jsonArray.getJsonObject(0);
+
+         System.out.println("jsonSession = " + jsonSession);

Review Comment:
   logger.debug("jsonSession = {}",  jsonSession); ?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -704,9 +708,9 @@ public List<MessageReference> cancelRefs(final boolean failed,
          final List<MessageReference> refs = new ArrayList<>(deliveringRefs.size());
          MessageReference ref;
          while ((ref = deliveringRefs.poll()) != null) {
+            metrics.addAcknowledge(ref.getMessage().getEncodeSize());

Review Comment:
   This one doesnt look to be synchronized on the consumer the way the others are, worth looking at as it could mean thread safety issues corrupting the metrics updates since it does stuff like -= and ++ on the values.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -4023,6 +4565,114 @@ public void testListProducers() throws Exception {
       }
    }
 
+   @Test
+   public void testListProducersMessageCounts() throws Exception {

Review Comment:
   Doing some tests with AMQP which has actual producer elements would be good, vs only using Core which doesnt really seem to. Actually, using the Core JMS client as well as or instead of just the Core client would probably make sense too.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -2769,27 +2775,554 @@ public void testListAllConsumersAsJSON() throws Exception {
       JsonObject first = sorted[0];
       JsonObject second = sorted[1];
 
-      Assert.assertTrue(first.getJsonNumber("creationTime").longValue() > 0);
-      Assert.assertNotNull(first.getJsonNumber("consumerID").longValue());
-      Assert.assertTrue(first.getString("connectionID").length() > 0);
-      Assert.assertEquals(factory.getConnection().getID().toString(), first.getString("connectionID"));
-      Assert.assertTrue(first.getString("sessionID").length() > 0);
-      Assert.assertEquals(((ClientSessionImpl) session).getName(), first.getString("sessionID"));
-      Assert.assertTrue(first.getString("queueName").length() > 0);
-      Assert.assertEquals(queueName.toString(), first.getString("queueName"));
-      Assert.assertEquals(false, first.getBoolean("browseOnly"));
-      Assert.assertEquals(0, first.getJsonNumber("deliveringCount").longValue());
+      Assert.assertTrue(first.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue() > 0);
+      Assert.assertNotNull(first.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+      Assert.assertTrue(first.getString(ConsumerField.CONNECTION.getAlternativeName()).length() > 0);
+      Assert.assertEquals(factory.getConnection().getID().toString(), first.getString(ConsumerField.CONNECTION.getAlternativeName()));
+      Assert.assertTrue(first.getString(ConsumerField.SESSION.getAlternativeName()).length() > 0);
+      Assert.assertEquals(((ClientSessionImpl) session).getName(), first.getString(ConsumerField.SESSION.getAlternativeName()));
+      Assert.assertTrue(first.getString(ConsumerField.QUEUE.getAlternativeName()).length() > 0);
+      Assert.assertEquals(queueName.toString(), first.getString(ConsumerField.QUEUE.getAlternativeName()));
+      Assert.assertEquals(false, first.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
+
+      Assert.assertTrue(second.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue() > 0);
+      Assert.assertNotNull(second.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+      Assert.assertTrue(second.getString(ConsumerField.CONNECTION.getAlternativeName()).length() > 0);
+      Assert.assertEquals(factory2.getConnection().getID().toString(), second.getString(ConsumerField.CONNECTION.getAlternativeName()));
+      Assert.assertTrue(second.getString(ConsumerField.SESSION.getAlternativeName()).length() > 0);
+      Assert.assertEquals(((ClientSessionImpl) session2).getName(), second.getString(ConsumerField.SESSION.getAlternativeName()));
+      Assert.assertTrue(second.getString(ConsumerField.QUEUE.getAlternativeName()).length() > 0);
+      Assert.assertEquals(queueName.toString(), second.getString(ConsumerField.QUEUE.getAlternativeName()));
+      Assert.assertEquals(false, second.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+      Assert.assertEquals(0, second.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
 
-      Assert.assertTrue(second.getJsonNumber("creationTime").longValue() > 0);
-      Assert.assertNotNull(second.getJsonNumber("consumerID").longValue());
-      Assert.assertTrue(second.getString("connectionID").length() > 0);
-      Assert.assertEquals(factory2.getConnection().getID().toString(), second.getString("connectionID"));
-      Assert.assertTrue(second.getString("sessionID").length() > 0);
-      Assert.assertEquals(((ClientSessionImpl) session2).getName(), second.getString("sessionID"));
-      Assert.assertTrue(second.getString("queueName").length() > 0);
-      Assert.assertEquals(queueName.toString(), second.getString("queueName"));
-      Assert.assertEquals(false, second.getBoolean("browseOnly"));
-      Assert.assertEquals(0, second.getJsonNumber("deliveringCount").longValue());
+   }
+
+
+   @Test
+   public void testListAllConsumersAsJSONTXCommit() throws Exception {
+      SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(true,false, 1);
+      addClientSession(session);
+
+      serverControl.createAddress(queueName.toString(), RoutingType.ANYCAST.name());
+      if (legacyCreateQueue) {
+         server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
+      } else {
+         server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, null, 100, -1, false);
+      addClientConsumer(consumer);
+      session.start();
+      Thread.sleep(200);
+
+      ClientProducer producer = session.createProducer(queueName);
+      int size = 0;
+      ClientMessage receive = null;
+      for (int i = 0; i < 100; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         producer.send(message);
+         size += message.getEncodeSize();
+         receive = consumer.receive();
+      }
+
+      String jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      JsonArray array = JsonUtil.readJsonArray(jsonString);
+      JsonObject first = (JsonObject) array.get(0);
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+      receive.acknowledge();
+      session.commit();
+
+      jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      array = JsonUtil.readJsonArray(jsonString);
+      first = (JsonObject) array.get(0);
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+
+      int allSize = size;
+      for (int i = 0; i < 100; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         producer.send(message);
+         allSize += message.getEncodeSize();
+         receive = consumer.receive();
+      }
+
+      jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      array = JsonUtil.readJsonArray(jsonString);
+      first = (JsonObject) array.get(0);
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(200, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(allSize, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+   }
+
+   @Test
+   public void testListAllConsumersAsJSONTXCommitAck() throws Exception {
+      SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(true,false, 1);
+      addClientSession(session);
+
+      serverControl.createAddress(queueName.toString(), RoutingType.ANYCAST.name());
+      if (legacyCreateQueue) {
+         server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
+      } else {
+         server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, null, 100, -1, false);
+      addClientConsumer(consumer);
+      session.start();
+      Thread.sleep(200);
+
+      ClientProducer producer = session.createProducer(queueName);
+      int size = 0;
+      ClientMessage receive = null;
+      for (int i = 0; i < 100; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         producer.send(message);
+         size += message.getEncodeSize();
+         receive = consumer.receive();
+         receive.acknowledge();
+      }
+
+      String jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      JsonArray array = JsonUtil.readJsonArray(jsonString);
+      JsonObject first = (JsonObject) array.get(0);
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+      session.commit();
+
+      jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      array = JsonUtil.readJsonArray(jsonString);
+      first = (JsonObject) array.get(0);
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+   }
+
+   @Test
+   public void testListAllConsumersAsJSONTXRollback() throws Exception {
+      SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(true,false, 1);
+      addClientSession(session);
+
+      serverControl.createAddress(queueName.toString(), RoutingType.ANYCAST.name());
+      if (legacyCreateQueue) {
+         server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
+      } else {
+         server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, null, 100, -1, false);
+      addClientConsumer(consumer);
+      session.start();
+      Thread.sleep(200);
+
+      ClientProducer producer = session.createProducer(queueName);
+      int size = 0;
+      ClientMessage receive = null;
+      for (int i = 0; i < 100; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         producer.send(message);
+         size += message.getEncodeSize();
+         receive = consumer.receive();
+      }
+
+      String jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      JsonArray array = JsonUtil.readJsonArray(jsonString);
+      JsonObject first = (JsonObject) array.get(0);
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+      receive.acknowledge();   //stop the session so we dont receive the same messages
+      session.stop();
+      jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      array = JsonUtil.readJsonArray(jsonString);
+      first = (JsonObject) array.get(0);
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(100, first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+
+      session.rollback();
+     // Thread.sleep(1000);

Review Comment:
   Delete? Same in other instances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org