You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/09/22 16:46:26 UTC

svn commit: r1761977 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/virtualhost/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ sys...

Author: kwall
Date: Thu Sep 22 16:46:26 2016
New Revision: 1761977

URL: http://svn.apache.org/viewvc?rev=1761977&view=rev
Log:
QPID-7366: [Java Broker] REST message publication

* added support system tests
* guarded for messages published with no headers at all
* allowed internal/0-10 converter to convert messageId, if it is a valid format UUID

Added:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1761977&r1=1761976&r2=1761977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Thu Sep 22 16:46:26 2016
@@ -209,7 +209,10 @@ public interface VirtualHost<X extends V
     @ManagedOperation(nonModifying = true)
     Connection<?> getConnection(@Param(name="name") String name);
 
-    @ManagedOperation(secure = true)
+    @ManagedOperation(secure = true,
+                      description = "Publishes a message to a specified address. "
+                                    + "Returns the number of queues onto which it has been placed, "
+                                    + " or zero, if the address routes to no queues.")
     int publishMessage(@Param(name = "message")ManageableMessage message);
 
     @ManagedOperation(nonModifying = true, description = "Extract configuration", paramRequiringSecure = "includeSecureAttributes")

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1761977&r1=1761976&r2=1761977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Sep 22 16:46:26 2016
@@ -1664,25 +1664,30 @@ public abstract class AbstractVirtualHos
         @Override
         public Object getHeader(final String name)
         {
-            return _message.getHeaders().get(name);
+            return getHeaders().get(name);
         }
 
         @Override
         public boolean containsHeaders(final Set<String> names)
         {
-            return _message.getHeaders().keySet().containsAll(names);
+            return getHeaders().keySet().containsAll(names);
         }
 
         @Override
         public boolean containsHeader(final String name)
         {
-            return _message.getHeaders().keySet().contains(name);
+            return getHeaders().keySet().contains(name);
         }
 
         @Override
         public Collection<String> getHeaderNames()
         {
-            return Collections.unmodifiableCollection(_message.getHeaders().keySet());
+            return Collections.unmodifiableCollection(getHeaders().keySet());
+        }
+
+        private Map<String, Object> getHeaders()
+        {
+            return _message.getHeaders() == null ? Collections.<String, Object>emptyMap() : _message.getHeaders();
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1761977&r1=1761976&r2=1761977&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Thu Sep 22 16:46:26 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.UUID;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.internal.InternalMessage;
@@ -108,8 +109,6 @@ public class MessageConverter_Internal_t
         DeliveryProperties deliveryProps = new DeliveryProperties();
         MessageProperties messageProps = new MessageProperties();
 
-
-
         deliveryProps.setExpiration(serverMsg.getExpiration());
         deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
         deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress());
@@ -118,11 +117,22 @@ public class MessageConverter_Internal_t
         messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
         messageProps.setContentLength(size);
         messageProps.setContentType(bodyMimeType);
-        if(serverMsg.getMessageHeader().getCorrelationId() != null)
+        if (serverMsg.getMessageHeader().getCorrelationId() != null)
         {
             messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
         }
         messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeaderMap());
+        if (serverMsg.getMessageHeader().getMessageId() != null)
+        {
+            try
+            {
+                messageProps.setMessageId(UUID.fromString(serverMsg.getMessageHeader().getMessageId()));
+            }
+            catch (IllegalArgumentException iae)
+            {
+                // ignore message id is not a UUID
+            }
+        }
         Header header = new Header(deliveryProps, messageProps, null);
         return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
     }

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java?rev=1761977&r1=1761976&r2=1761977&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Thu Sep 22 16:46:26 2016
@@ -398,6 +398,14 @@ public class RestTestHelper
         return readJsonResponse(connection, valueType);
     }
 
+    public <T> T postJson(String path, final Object data , final Class<T> valueType) throws IOException
+    {
+        HttpURLConnection connection = openManagementConnection(path, "POST");
+        connection.connect();
+        writeJsonRequest(connection, data);
+        return readJsonResponse(connection, valueType);
+    }
+
     public void createNewGroupMember(String groupProviderName, String groupName, String memberName, int responseCode) throws IOException
     {
         HttpURLConnection connection = openManagementConnection(

Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java?rev=1761977&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java (added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java Thu Sep 22 16:46:26 2016
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.rest;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.port.HttpPort;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class PublishMessageRestTest extends QpidRestTestCase
+{
+
+    private Connection _connection;
+    private Session _session;
+    private String _queueName;
+    private MessageConsumer _consumer;
+    private String _publishMessageOpUrl;
+    private String _queueUrl;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+
+        _connection = getConnection();
+        _connection.start();
+
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        _queueName = getTestQueueName();
+        Destination queue = _session.createQueue(_queueName);
+        _consumer = _session.createConsumer(queue);
+
+        _publishMessageOpUrl = String.format("virtualhost/%s/%s/publishMessage", TEST1_VIRTUALHOST, TEST1_VIRTUALHOST);
+        _queueUrl = String.format("queue/%s/%s/", TEST1_VIRTUALHOST, TEST1_VIRTUALHOST);
+    }
+
+    @Override
+    protected void customizeConfiguration() throws Exception
+    {
+        super.customizeConfiguration();
+        getDefaultBrokerConfiguration().setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT,
+                                                           HttpPort.ALLOW_CONFIDENTIAL_OPERATIONS_ON_INSECURE_CHANNELS,
+                                                           true);
+    }
+
+    public void testPublishMinimalEmptyMessage() throws Exception
+    {
+        Map<String, Object> messageBody = new HashMap<>();
+        messageBody.put("address", _queueName);
+
+        getRestTestHelper().submitRequest(_publishMessageOpUrl,
+                                          "POST",
+                                          Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK);
+
+        Message message = _consumer.receive(getLongReceiveTimeout());
+        assertNotNull("Expected message not received", message);
+        assertNull("Unexpected JMSMessageID", message.getJMSMessageID());
+        assertNull("Unexpected JMSCorrelationID", message.getJMSCorrelationID());
+        assertEquals("Unexpected JMSExpiration", 0, message.getJMSExpiration());
+        assertNotSame("Unexpected JMSTimestamp", 0, message.getJMSTimestamp());
+        assertFalse("Unexpected number of mesage properties", message.getPropertyNames().hasMoreElements());
+    }
+
+    public void testPublishMessageWithPropertiesAndHeaders() throws Exception
+    {
+        final String messageId = UUID.randomUUID().toString();
+        final long tomorrow = TimeUnit.DAYS.toMillis(1) + System.currentTimeMillis();
+        final Map<String, Object> headers =  new HashMap<>();
+        headers.put("stringprop", "mystring");
+        headers.put("intprop", Integer.MIN_VALUE);
+        headers.put("longprop", Long.MAX_VALUE);
+        //headers.put("", "mykeyisempty"); // 0-8..0-91 Causes Broker to die (MessageConverter_Internal_to_v0_8), 0-10 fails in JMS client on receipt
+        //headers.put("nullpropvalue", null); // 0-8..0-91 Causes Broker failure (MessageConverter_Internal_to_v0_8), 0-10 JMS client ignores property
+
+        final Map<String, Object> messageBody = new HashMap<>();
+        messageBody.put("messageId", messageId);
+        messageBody.put("address", _queueName);
+        messageBody.put("expiration", tomorrow);
+        messageBody.put("headers", headers);
+
+        getRestTestHelper().submitRequest(_publishMessageOpUrl,
+                                          "POST",
+                                          Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK);
+
+        Message message = _consumer.receive(getLongReceiveTimeout());
+        assertNotNull("Expected message not received", message);
+        final String jmsMessageID = message.getJMSMessageID().replaceFirst("ID:", "");
+        assertEquals("Unexpected JMSMessageID", messageId, jmsMessageID);
+        assertEquals("Unexpected JMSExpiration", tomorrow, message.getJMSExpiration());
+
+        final Enumeration propertyEnumeration = message.getPropertyNames();
+        int count = 0;
+        while(propertyEnumeration.hasMoreElements())
+        {
+            String key = (String) propertyEnumeration.nextElement();
+            assertEquals("Unexpected property value fo key : " + key, headers.get(key), message.getObjectProperty(key));
+            count++;
+        }
+        assertEquals("Unexpected number of properties", headers.size(), count);
+    }
+
+    public void testPublishStringMessage() throws Exception
+    {
+        final String content = "Hello world";
+        TextMessage message = publishMessageWithContent(content, TextMessage.class);
+        assertEquals("Unexpected message content", content, message.getText());
+    }
+
+    public void testPublishMapMessage() throws Exception
+    {
+        final Map<String, Object> content = new HashMap<>();
+        content.put("key1", "astring");
+        content.put("key2", Integer.MIN_VALUE);
+        content.put("key3", Long.MAX_VALUE);
+        content.put("key4", null);
+        MapMessage message = publishMessageWithContent(content, MapMessage.class);
+        final Enumeration mapNames = message.getMapNames();
+        int entryCount = 0;
+        while(mapNames.hasMoreElements())
+        {
+            String key = (String) mapNames.nextElement();
+            assertEquals("Unexpected map content for key : " + key, content.get(key), message.getObject(key));
+            entryCount++;
+        }
+        assertEquals("Unexpected number of key/value pairs in map message", content.size(), entryCount);
+    }
+
+    public void testPublishRouting() throws Exception
+    {
+        final String queueName = UUID.randomUUID().toString();
+        Map<String, Object> messageBody = Collections.<String, Object>singletonMap("address", queueName);
+
+        int enqueues = getRestTestHelper().postJson(_publishMessageOpUrl,
+                                                    Collections.singletonMap("message", messageBody),
+                                                    Integer.class);
+        assertEquals("Unexpected number of enqueues", 0, enqueues);
+
+        getRestTestHelper().submitRequest(_queueUrl, "POST", Collections.singletonMap(Queue.NAME, queueName), HttpServletResponse.SC_CREATED);
+
+        enqueues = getRestTestHelper().postJson(_publishMessageOpUrl,
+                                                Collections.singletonMap("message", messageBody),
+                                                Integer.class);
+
+
+        assertEquals("Unexpected number of enqueues after queue creation", 1, enqueues);
+    }
+
+    private <M extends Message> M publishMessageWithContent(final Object content, final Class<M> expectedMessageClass) throws Exception
+    {
+        Map<String, Object> messageBody = new HashMap<>();
+        messageBody.put("address", _queueName);
+        messageBody.put("content", content);
+
+        getRestTestHelper().submitRequest(_publishMessageOpUrl,
+                                          "POST",
+                                          Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK);
+
+        M message = (M) _consumer.receive(getLongReceiveTimeout());
+        assertNotNull("Expected message not received", message);
+        assertTrue(String.format("Unexpected message type. Expecting %s got %s", expectedMessageClass, message.getClass()),
+                   expectedMessageClass.isAssignableFrom(message.getClass()));
+        return message;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org