You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/09/22 16:46:26 UTC
svn commit: r1761977 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/virtualhost/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
sys...
Author: kwall
Date: Thu Sep 22 16:46:26 2016
New Revision: 1761977
URL: http://svn.apache.org/viewvc?rev=1761977&view=rev
Log:
QPID-7366: [Java Broker] REST message publication
* added support system tests
* guarded for messages published with no headers at all
* allowed internal/0-10 converter to convert messageId, if it is a valid format UUID
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1761977&r1=1761976&r2=1761977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Thu Sep 22 16:46:26 2016
@@ -209,7 +209,10 @@ public interface VirtualHost<X extends V
@ManagedOperation(nonModifying = true)
Connection<?> getConnection(@Param(name="name") String name);
- @ManagedOperation(secure = true)
+ @ManagedOperation(secure = true,
+ description = "Publishes a message to a specified address. "
+ + "Returns the number of queues onto which it has been placed, "
+ + " or zero, if the address routes to no queues.")
int publishMessage(@Param(name = "message")ManageableMessage message);
@ManagedOperation(nonModifying = true, description = "Extract configuration", paramRequiringSecure = "includeSecureAttributes")
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1761977&r1=1761976&r2=1761977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Sep 22 16:46:26 2016
@@ -1664,25 +1664,30 @@ public abstract class AbstractVirtualHos
@Override
public Object getHeader(final String name)
{
- return _message.getHeaders().get(name);
+ return getHeaders().get(name);
}
@Override
public boolean containsHeaders(final Set<String> names)
{
- return _message.getHeaders().keySet().containsAll(names);
+ return getHeaders().keySet().containsAll(names);
}
@Override
public boolean containsHeader(final String name)
{
- return _message.getHeaders().keySet().contains(name);
+ return getHeaders().keySet().contains(name);
}
@Override
public Collection<String> getHeaderNames()
{
- return Collections.unmodifiableCollection(_message.getHeaders().keySet());
+ return Collections.unmodifiableCollection(getHeaders().keySet());
+ }
+
+ private Map<String, Object> getHeaders()
+ {
+ return _message.getHeaders() == null ? Collections.<String, Object>emptyMap() : _message.getHeaders();
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1761977&r1=1761976&r2=1761977&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Thu Sep 22 16:46:26 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import java.util.Collection;
import java.util.Collections;
+import java.util.UUID;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.internal.InternalMessage;
@@ -108,8 +109,6 @@ public class MessageConverter_Internal_t
DeliveryProperties deliveryProps = new DeliveryProperties();
MessageProperties messageProps = new MessageProperties();
-
-
deliveryProps.setExpiration(serverMsg.getExpiration());
deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress());
@@ -118,11 +117,22 @@ public class MessageConverter_Internal_t
messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
messageProps.setContentLength(size);
messageProps.setContentType(bodyMimeType);
- if(serverMsg.getMessageHeader().getCorrelationId() != null)
+ if (serverMsg.getMessageHeader().getCorrelationId() != null)
{
messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
}
messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeaderMap());
+ if (serverMsg.getMessageHeader().getMessageId() != null)
+ {
+ try
+ {
+ messageProps.setMessageId(UUID.fromString(serverMsg.getMessageHeader().getMessageId()));
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // ignore message id is not a UUID
+ }
+ }
Header header = new Header(deliveryProps, messageProps, null);
return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
}
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java?rev=1761977&r1=1761976&r2=1761977&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Thu Sep 22 16:46:26 2016
@@ -398,6 +398,14 @@ public class RestTestHelper
return readJsonResponse(connection, valueType);
}
+ public <T> T postJson(String path, final Object data , final Class<T> valueType) throws IOException
+ {
+ HttpURLConnection connection = openManagementConnection(path, "POST");
+ connection.connect();
+ writeJsonRequest(connection, data);
+ return readJsonResponse(connection, valueType);
+ }
+
public void createNewGroupMember(String groupProviderName, String groupName, String memberName, int responseCode) throws IOException
{
HttpURLConnection connection = openManagementConnection(
Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java?rev=1761977&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java (added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java Thu Sep 22 16:46:26 2016
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.rest;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.port.HttpPort;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class PublishMessageRestTest extends QpidRestTestCase
+{
+
+ private Connection _connection;
+ private Session _session;
+ private String _queueName;
+ private MessageConsumer _consumer;
+ private String _publishMessageOpUrl;
+ private String _queueUrl;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _connection = getConnection();
+ _connection.start();
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queueName = getTestQueueName();
+ Destination queue = _session.createQueue(_queueName);
+ _consumer = _session.createConsumer(queue);
+
+ _publishMessageOpUrl = String.format("virtualhost/%s/%s/publishMessage", TEST1_VIRTUALHOST, TEST1_VIRTUALHOST);
+ _queueUrl = String.format("queue/%s/%s/", TEST1_VIRTUALHOST, TEST1_VIRTUALHOST);
+ }
+
+ @Override
+ protected void customizeConfiguration() throws Exception
+ {
+ super.customizeConfiguration();
+ getDefaultBrokerConfiguration().setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT,
+ HttpPort.ALLOW_CONFIDENTIAL_OPERATIONS_ON_INSECURE_CHANNELS,
+ true);
+ }
+
+ public void testPublishMinimalEmptyMessage() throws Exception
+ {
+ Map<String, Object> messageBody = new HashMap<>();
+ messageBody.put("address", _queueName);
+
+ getRestTestHelper().submitRequest(_publishMessageOpUrl,
+ "POST",
+ Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK);
+
+ Message message = _consumer.receive(getLongReceiveTimeout());
+ assertNotNull("Expected message not received", message);
+ assertNull("Unexpected JMSMessageID", message.getJMSMessageID());
+ assertNull("Unexpected JMSCorrelationID", message.getJMSCorrelationID());
+ assertEquals("Unexpected JMSExpiration", 0, message.getJMSExpiration());
+ assertNotSame("Unexpected JMSTimestamp", 0, message.getJMSTimestamp());
+ assertFalse("Unexpected number of mesage properties", message.getPropertyNames().hasMoreElements());
+ }
+
+ public void testPublishMessageWithPropertiesAndHeaders() throws Exception
+ {
+ final String messageId = UUID.randomUUID().toString();
+ final long tomorrow = TimeUnit.DAYS.toMillis(1) + System.currentTimeMillis();
+ final Map<String, Object> headers = new HashMap<>();
+ headers.put("stringprop", "mystring");
+ headers.put("intprop", Integer.MIN_VALUE);
+ headers.put("longprop", Long.MAX_VALUE);
+ //headers.put("", "mykeyisempty"); // 0-8..0-91 Causes Broker to die (MessageConverter_Internal_to_v0_8), 0-10 fails in JMS client on receipt
+ //headers.put("nullpropvalue", null); // 0-8..0-91 Causes Broker failure (MessageConverter_Internal_to_v0_8), 0-10 JMS client ignores property
+
+ final Map<String, Object> messageBody = new HashMap<>();
+ messageBody.put("messageId", messageId);
+ messageBody.put("address", _queueName);
+ messageBody.put("expiration", tomorrow);
+ messageBody.put("headers", headers);
+
+ getRestTestHelper().submitRequest(_publishMessageOpUrl,
+ "POST",
+ Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK);
+
+ Message message = _consumer.receive(getLongReceiveTimeout());
+ assertNotNull("Expected message not received", message);
+ final String jmsMessageID = message.getJMSMessageID().replaceFirst("ID:", "");
+ assertEquals("Unexpected JMSMessageID", messageId, jmsMessageID);
+ assertEquals("Unexpected JMSExpiration", tomorrow, message.getJMSExpiration());
+
+ final Enumeration propertyEnumeration = message.getPropertyNames();
+ int count = 0;
+ while(propertyEnumeration.hasMoreElements())
+ {
+ String key = (String) propertyEnumeration.nextElement();
+ assertEquals("Unexpected property value fo key : " + key, headers.get(key), message.getObjectProperty(key));
+ count++;
+ }
+ assertEquals("Unexpected number of properties", headers.size(), count);
+ }
+
+ public void testPublishStringMessage() throws Exception
+ {
+ final String content = "Hello world";
+ TextMessage message = publishMessageWithContent(content, TextMessage.class);
+ assertEquals("Unexpected message content", content, message.getText());
+ }
+
+ public void testPublishMapMessage() throws Exception
+ {
+ final Map<String, Object> content = new HashMap<>();
+ content.put("key1", "astring");
+ content.put("key2", Integer.MIN_VALUE);
+ content.put("key3", Long.MAX_VALUE);
+ content.put("key4", null);
+ MapMessage message = publishMessageWithContent(content, MapMessage.class);
+ final Enumeration mapNames = message.getMapNames();
+ int entryCount = 0;
+ while(mapNames.hasMoreElements())
+ {
+ String key = (String) mapNames.nextElement();
+ assertEquals("Unexpected map content for key : " + key, content.get(key), message.getObject(key));
+ entryCount++;
+ }
+ assertEquals("Unexpected number of key/value pairs in map message", content.size(), entryCount);
+ }
+
+ public void testPublishRouting() throws Exception
+ {
+ final String queueName = UUID.randomUUID().toString();
+ Map<String, Object> messageBody = Collections.<String, Object>singletonMap("address", queueName);
+
+ int enqueues = getRestTestHelper().postJson(_publishMessageOpUrl,
+ Collections.singletonMap("message", messageBody),
+ Integer.class);
+ assertEquals("Unexpected number of enqueues", 0, enqueues);
+
+ getRestTestHelper().submitRequest(_queueUrl, "POST", Collections.singletonMap(Queue.NAME, queueName), HttpServletResponse.SC_CREATED);
+
+ enqueues = getRestTestHelper().postJson(_publishMessageOpUrl,
+ Collections.singletonMap("message", messageBody),
+ Integer.class);
+
+
+ assertEquals("Unexpected number of enqueues after queue creation", 1, enqueues);
+ }
+
+ private <M extends Message> M publishMessageWithContent(final Object content, final Class<M> expectedMessageClass) throws Exception
+ {
+ Map<String, Object> messageBody = new HashMap<>();
+ messageBody.put("address", _queueName);
+ messageBody.put("content", content);
+
+ getRestTestHelper().submitRequest(_publishMessageOpUrl,
+ "POST",
+ Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK);
+
+ M message = (M) _consumer.receive(getLongReceiveTimeout());
+ assertNotNull("Expected message not received", message);
+ assertTrue(String.format("Unexpected message type. Expecting %s got %s", expectedMessageClass, message.getClass()),
+ expectedMessageClass.isAssignableFrom(message.getClass()));
+ return message;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org