You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2019/11/20 09:42:57 UTC

[qpid-broker-j] branch master updated: QPID-8373 : Add deliveredToConsumerId to MessageInfo

This is an automated email from the ASF dual-hosted git repository.

rgodfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ef9728  QPID-8373 : Add deliveredToConsumerId to MessageInfo
5ef9728 is described below

commit 5ef97289cea7b797e0c50fbeb6f2c91361da0b00
Author: Robert Godfrey <rg...@apache.org>
AuthorDate: Wed Nov 20 10:42:41 2019 +0100

    QPID-8373 : Add deliveredToConsumerId to MessageInfo
---
 .../apache/qpid/server/message/MessageInfo.java    |  1 +
 .../qpid/server/message/MessageInfoImpl.java       | 10 +++++
 .../tests/http/endtoend/message/MessageTest.java   | 49 ++++++++++++++++++++--
 3 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
index b0e720b..d0fdce5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
@@ -36,6 +36,7 @@ public interface MessageInfo extends ManagedAttributeValue
     int getDeliveryCount();
     String getState();
     String getDeliveredTo();
+    String getDeliveredToConsumerId();
     Date getArrivalTime();
     boolean isPersistent();
     String getMessageId();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
index f495361..252950e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
@@ -25,6 +25,8 @@ import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.qpid.server.model.Consumer;
+
 public class MessageInfoImpl implements MessageInfo
 {
     private final String _deliveredTo;
@@ -52,6 +54,7 @@ public class MessageInfoImpl implements MessageInfo
     private final Date _notValidBefore;
     private final String _messageType;
     private final String _groupId;
+    private final String _deliveredToConsumerId;
 
     public MessageInfoImpl(final MessageInstance instance, final boolean includeHeaders)
     {
@@ -60,6 +63,7 @@ public class MessageInfoImpl implements MessageInfo
 
         MessageInstanceConsumer<?> acquiringConsumer = instance.getAcquiringConsumer();
         _deliveredTo = acquiringConsumer == null ? null : String.valueOf(acquiringConsumer.getIdentifier());
+        _deliveredToConsumerId = (acquiringConsumer instanceof Consumer) ?  String.valueOf(((Consumer<?,?>)acquiringConsumer).getId()) : null;
         _arrivalTime = message.getArrivalTime() == 0L ? null : new Date(message.getArrivalTime());
         _messageType = message.getMessageType();
         _persistent = message.isPersistent();
@@ -150,6 +154,12 @@ public class MessageInfoImpl implements MessageInfo
     }
 
     @Override
+    public String getDeliveredToConsumerId()
+    {
+        return _deliveredToConsumerId;
+    }
+
+    @Override
     public Date getArrivalTime()
     {
         return _arrivalTime == null ? null : new Date(_arrivalTime.getTime());
diff --git a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageTest.java b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageTest.java
index d1661e6..36afeed 100644
--- a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageTest.java
+++ b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageTest.java
@@ -32,8 +32,6 @@ import static org.hamcrest.Matchers.typeCompatibleWith;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeThat;
 
-import java.io.InputStream;
-import java.net.HttpURLConnection;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -57,7 +55,6 @@ import javax.jms.TextMessage;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Strings;
-import com.google.common.io.ByteStreams;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -153,6 +150,52 @@ public class MessageTest extends HttpTestBase
 
 
     @Test
+    public void getAcquiredMessage() throws Exception
+    {
+
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(QUEUE_NAME);
+
+            MessageProducer producer = session.createProducer(queue);
+            Message jmsMessage = session.createMessage();
+            producer.send(jmsMessage);
+
+            List<Map<String, Object>> messages = getHelper().postJson("queue/myqueue/getMessageInfo",
+                                                                      Collections.singletonMap("includeHeaders",
+                                                                                               Boolean.FALSE),
+                                                                      LIST_MAP_TYPE_REF, SC_OK);
+            assertThat(messages.size(), is(equalTo(1)));
+            Map<String, Object> message = messages.get(0);
+
+            assertThat(message.get("deliveredToConsumerId"), is(nullValue()));
+            connection.start();
+            MessageConsumer consumer = session.createConsumer(queue);
+            jmsMessage = consumer.receive(5000);
+            assertThat(jmsMessage, is(notNullValue()));
+
+            messages = getHelper().postJson("queue/myqueue/getMessageInfo",
+                                                                      Collections.singletonMap("includeHeaders",
+                                                                                               Boolean.FALSE),
+                                                                      LIST_MAP_TYPE_REF, SC_OK);
+            assertThat(messages.size(), is(equalTo(1)));
+
+            message = messages.get(0);
+
+            assertThat(message.get("deliveredToConsumerId"), is(notNullValue()));
+        }
+        finally
+        {
+            connection.close();
+        }
+
+    }
+
+
+
+    @Test
     public void getJmsMapMessage() throws Exception
     {
         final String mapKey = "key";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org