You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/09/06 08:17:59 UTC
svn commit: r1759379 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/
broker-pl...
Author: orudyy
Date: Tue Sep 6 08:17:59 2016
New Revision: 1759379
URL: http://svn.apache.org/viewvc?rev=1759379&view=rev
Log:
QPID-7408: [Java Broker] Prevent double compression and decompress compressed message content if compression is unsupported on REST client
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java?rev=1759379&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java Tue Sep 6 08:17:59 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.model;
+
+import java.io.InputStream;
+
+public interface StreamingContent
+{
+ InputStream getInputStream();
+}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1759379&r1=1759378&r2=1759379&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Sep 6 08:17:59 2016
@@ -19,6 +19,7 @@
package org.apache.qpid.server.queue;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -104,6 +105,7 @@ import org.apache.qpid.server.util.MapVa
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
+import org.apache.qpid.streams.CompositeInputStream;
import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -2682,7 +2684,7 @@ public abstract class AbstractQueue<X ex
}
}
- class MessageContent implements Content, CustomRestHeaders
+ class MessageContent implements Content, CustomRestHeaders, StreamingContent
{
private static final int UNLIMITED = -1;
private final MessageReference<?> _messageReference;
@@ -2757,6 +2759,36 @@ public abstract class AbstractQueue<X ex
throw new RuntimeException("JVM does not support UTF8", e);
}
}
+
+ @Override
+ public InputStream getInputStream()
+ {
+ ServerMessage message = _messageReference.getMessage();
+ final Collection<QpidByteBuffer> content = message.getContent(0, (int) _limit);
+ Collection<InputStream> streams = new ArrayList<>(content.size());
+ for (QpidByteBuffer b : content)
+ {
+ streams.add(b.asInputStream());
+ }
+ return new CompositeInputStream(streams)
+ {
+ @Override
+ public void close() throws IOException
+ {
+ try
+ {
+ super.close();
+ }
+ finally
+ {
+ for (QpidByteBuffer b : content)
+ {
+ b.dispose();
+ }
+ }
+ }
+ };
+ }
}
private static class AcquireAllQueueEntryFilter implements QueueEntryFilter
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java?rev=1759379&r1=1759378&r2=1759379&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java Tue Sep 6 08:17:59 2016
@@ -62,9 +62,9 @@ public class HttpManagementUtil
private static final String ATTR_SUBJECT = "Qpid.subject";
private static final String ATTR_LOG_ACTOR = "Qpid.logActor";
- private static final String ACCEPT_ENCODING_HEADER = "Accept-Encoding";
- private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
- private static final String GZIP_CONTENT_ENCODING = "gzip";
+ public static final String ACCEPT_ENCODING_HEADER = "Accept-Encoding";
+ public static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+ public static final String GZIP_CONTENT_ENCODING = "gzip";
private static final Collection<HttpRequestPreemptiveAuthenticator> AUTHENTICATORS;
private static final Operation MANAGE_ACTION = Operation.ACTION("manage");
@@ -178,9 +178,7 @@ public class HttpManagementUtil
throws IOException
{
OutputStream outputStream;
- if(managementConfiguration.isCompressResponses()
- && Collections.list(request.getHeaderNames()).contains(ACCEPT_ENCODING_HEADER)
- && request.getHeader(ACCEPT_ENCODING_HEADER).contains(GZIP_CONTENT_ENCODING))
+ if(isCompressing(request, managementConfiguration))
{
outputStream = new GZIPOutputStream(response.getOutputStream());
response.setHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_ENCODING);
@@ -192,6 +190,14 @@ public class HttpManagementUtil
return outputStream;
}
+ public static boolean isCompressing(final HttpServletRequest request,
+ final HttpManagementConfiguration managementConfiguration)
+ {
+ return managementConfiguration.isCompressResponses()
+ && Collections.list(request.getHeaderNames()).contains(ACCEPT_ENCODING_HEADER)
+ && request.getHeader(ACCEPT_ENCODING_HEADER).contains(GZIP_CONTENT_ENCODING);
+ }
+
public static String ensureFilenameIsRfc2183(final String requestedFilename)
{
return requestedFilename.replaceAll("[\\P{InBasic_Latin}\\\\:/\\p{Cntrl}]", "");
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java?rev=1759379&r1=1759378&r2=1759379&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java Tue Sep 6 08:17:59 2016
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.management.plugin.servlet.rest;
+import static org.apache.qpid.server.management.plugin.HttpManagementUtil.CONTENT_ENCODING_HEADER;
+import static org.apache.qpid.server.management.plugin.HttpManagementUtil.GZIP_CONTENT_ENCODING;
+
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method;
@@ -29,6 +32,7 @@ import java.security.PrivilegedException
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.zip.GZIPInputStream;
import javax.security.auth.Subject;
import javax.servlet.ServletConfig;
@@ -44,6 +48,7 @@ import org.apache.qpid.server.model.Rest
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +56,7 @@ import org.apache.qpid.server.management
import org.apache.qpid.server.management.plugin.HttpManagementUtil;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObjectJacksonModule;
+import org.apache.qpid.server.model.StreamingContent;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public abstract class AbstractServlet extends HttpServlet
@@ -319,47 +325,70 @@ public abstract class AbstractServlet ex
response.setDateHeader("Expires", 0);
}
- protected void writeTypedContent(Content content, HttpServletRequest request, HttpServletResponse response) throws IOException
+ protected void writeTypedContent(Content content, HttpServletRequest request, HttpServletResponse response)
+ throws IOException
{
- try(OutputStream os = getOutputStream(request, response))
+ Map<String, Object> headers = getResponseHeaders(content);
+ boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
+ boolean isCompressing = HttpManagementUtil.isCompressing(request, _managementConfiguration);
+ try (OutputStream os = isGzipCompressed ? response.getOutputStream() : getOutputStream(request, response))
{
- if(content instanceof CustomRestHeaders)
+ if (isGzipCompressed && !isCompressing && content instanceof StreamingContent)
{
- setResponseHeaders(response, (CustomRestHeaders) content);
+ headers.remove(CONTENT_ENCODING_HEADER);
+ content = new DecompressingContent((StreamingContent) content);
}
response.setStatus(HttpServletResponse.SC_OK);
+ for (Map.Entry<String, Object> entry : headers.entrySet())
+ {
+ response.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
+ }
content.write(os);
-
}
- catch(IOException e)
+ catch (IOException e)
{
LOGGER.warn("Unexpected exception processing request", e);
sendJsonErrorResponse(request, response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, e.getMessage());
}
}
- private void setResponseHeaders(final HttpServletResponse response, final CustomRestHeaders customRestHeaders)
+ private Map<String, Object> getResponseHeaders(final Object content)
{
- Map<RestContentHeader, Method> contentHeaderGetters = getContentHeaderMethods(customRestHeaders);
- if (contentHeaderGetters != null)
+ Map<String, Object> headers = Collections.emptyMap();
+ if (content instanceof CustomRestHeaders)
{
- for (Map.Entry<RestContentHeader, Method> entry : contentHeaderGetters.entrySet())
+ CustomRestHeaders customRestHeaders = (CustomRestHeaders) content;
+ Map<RestContentHeader, Method> contentHeaderGetters = getContentHeaderMethods(customRestHeaders);
+ if (contentHeaderGetters != null)
{
- final String headerName = entry.getKey().value();
- try
+ headers = new HashMap<>();
+ for (Map.Entry<RestContentHeader, Method> entry : contentHeaderGetters.entrySet())
{
- final Object headerValue = entry.getValue().invoke(customRestHeaders);
- if (headerValue != null)
+ final String headerName = entry.getKey().value();
+ try
{
- response.setHeader(headerName, String.valueOf(headerValue));
+ final Object headerValue = entry.getValue().invoke(customRestHeaders);
+ if (headerValue != null)
+ {
+ headers.put(headerName.toUpperCase(), headerValue);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Unexpected exception whilst setting response header " + headerName, e);
}
- }
- catch (Exception e)
- {
- LOGGER.warn("Unexpected exception whilst setting response header " + headerName, e);
}
}
+ }
+ return headers;
+ }
+ private void setResponseHeaders(final HttpServletResponse response, final CustomRestHeaders customRestHeaders)
+ {
+ Map<String, Object> headers = getResponseHeaders(customRestHeaders);
+ for(Map.Entry<String,Object> entry : headers.entrySet())
+ {
+ response.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
@@ -387,4 +416,31 @@ public abstract class AbstractServlet ex
return results;
}
+ private class DecompressingContent implements Content
+ {
+ private final StreamingContent _content;
+
+ public DecompressingContent(final StreamingContent content)
+ {
+ _content = content;
+ }
+
+ @Override
+ public void write(final OutputStream os) throws IOException
+ {
+ try(GZIPInputStream gzipInputStream = new GZIPInputStream(_content.getInputStream());)
+ {
+ ByteStreams.copy(gzipInputStream, os);
+ }
+ }
+
+ @Override
+ public void release()
+ {
+ if (_content instanceof Content)
+ {
+ ((Content)_content).release();
+ }
+ }
+ }
}
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java?rev=1759379&r1=1759378&r2=1759379&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Tue Sep 6 08:17:59 2016
@@ -93,6 +93,7 @@ public class RestTestHelper
private String _keystorePassword;
private String _clientAuthAlias;
+ private String _acceptEncoding;
public RestTestHelper(int httpPort)
{
@@ -232,6 +233,11 @@ public class RestTestHelper
httpCon.setRequestProperty("Authorization", "Basic " + encoded);
}
+ if (_acceptEncoding != null && !"".equals(_acceptEncoding))
+ {
+ httpCon.setRequestProperty("Accept-Encoding", _acceptEncoding);
+ }
+
httpCon.setDoOutput(true);
httpCon.setRequestMethod(method);
return httpCon;
@@ -672,4 +678,14 @@ public class RestTestHelper
}
return nodeAttributes;
}
+
+ public String getAcceptEncoding()
+ {
+ return _acceptEncoding;
+ }
+
+ public void setAcceptEncoding(String acceptEncoding)
+ {
+ _acceptEncoding = acceptEncoding;
+ }
}
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java?rev=1759379&r1=1759378&r2=1759379&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java Tue Sep 6 08:17:59 2016
@@ -20,9 +20,15 @@
*/
package org.apache.qpid.systest;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.zip.GZIPInputStream;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -36,6 +42,8 @@ import org.apache.qpid.client.AMQConnect
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.port.HttpPort;
import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -55,6 +63,9 @@ public class MessageCompressionTest exte
{
TestBrokerConfiguration config = getDefaultBrokerConfiguration();
config.addHttpManagementConfiguration();
+ config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT,
+ HttpPort.ALLOW_CONFIDENTIAL_OPERATIONS_ON_INSECURE_CHANNELS,
+ true);
super.startDefaultBroker();
_restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
@@ -116,32 +127,9 @@ public class MessageCompressionTest exte
Connection senderConnection = getConnection(senderCompresses);
String virtualPath = getConnectionFactory().getVirtualPath();
String testQueueName = getTestQueueName();
+ createAndBindQueue(virtualPath, testQueueName);
- // create the queue using REST and bind it
- assertEquals(201,
- _restTestHelper.submitRequest("/api/latest/queue"
- + virtualPath
- + virtualPath
- + "/"
- + testQueueName, "PUT", Collections.<String, Object>emptyMap()));
- assertEquals(201,
- _restTestHelper.submitRequest("/api/latest/binding"
- + virtualPath
- + virtualPath
- + "/amq.direct/"
- + testQueueName
- + "/"
- + testQueueName, "PUT", Collections.<String, Object>emptyMap()));
-
- Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // send a large message
- MessageProducer producer = session.createProducer(getTestQueue());
- TextMessage sentMessage = session.createTextMessage(messageText);
- sentMessage.setStringProperty("bar", "foo");
-
- producer.send(sentMessage);
- ((AMQSession)session).sync();
+ publishMessage(senderConnection, messageText);
// get the number of bytes received at the broker on the connection
List<Map<String, Object>> connectionRestOutput = _restTestHelper.getJsonAsList("/api/latest/connection");
@@ -163,7 +151,7 @@ public class MessageCompressionTest exte
// receive the message
Connection consumerConnection = getConnection(receiverUncompresses);
- session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(getTestQueue());
consumerConnection.start();
@@ -192,6 +180,105 @@ public class MessageCompressionTest exte
consumerConnection.close();
}
+ public void testGetContentViaRestForCompressedMessageWithAgentNotSupportingCompression() throws Exception
+ {
+ setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
+
+ doActualSetUp();
+
+ String messageText = createMessageText();
+ Connection senderConnection = getConnection(true);
+ String virtualPath = getConnectionFactory().getVirtualPath();
+ String testQueueName = getTestQueueName();
+ createAndBindQueue(virtualPath, testQueueName);
+
+ publishMessage(senderConnection, messageText);
+
+ String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueueName;
+
+ List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
+ assertEquals("Unexpected number of messages", 1, messages.size());
+ long id = ((Number) messages.get(0).get("id")).longValue();
+
+ byte[] messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?messageId=" + id);
+ String content = new String(messageBytes, StandardCharsets.UTF_8);
+ assertEquals("Unexpected message content :" + content, messageText, content);
+ }
+
+ public void testGetContentViaRestForCompressedMessageWithAgentSupportingCompression() throws Exception
+ {
+ setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
+
+ doActualSetUp();
+
+ String messageText = createMessageText();
+ Connection senderConnection = getConnection(true);
+ String virtualPath = getConnectionFactory().getVirtualPath();
+ String testQueueName = getTestQueueName();
+ createAndBindQueue(virtualPath, testQueueName);
+
+ publishMessage(senderConnection, messageText);
+
+ String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueueName;
+
+ List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
+ assertEquals("Unexpected number of messages", 1, messages.size());
+ long id = ((Number) messages.get(0).get("id")).longValue();
+
+ _restTestHelper.setAcceptEncoding("gzip, deflate, br");
+ HttpURLConnection connection =
+ _restTestHelper.openManagementConnection(queueRelativePath + "/getMessageContent?messageId=" + id,
+ "GET");
+ connection.connect();
+
+ String content;
+ try (InputStream is = new GZIPInputStream(connection.getInputStream());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream())
+ {
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = is.read(buffer)) != -1)
+ {
+ baos.write(buffer, 0, len);
+ }
+ content = new String(baos.toByteArray(), StandardCharsets.UTF_8);
+ }
+ assertEquals("Unexpected message content :" + content, messageText, content);
+ }
+
+ private void publishMessage(final Connection senderConnection, final String messageText)
+ throws JMSException, org.apache.qpid.QpidException
+ {
+ Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // send a large message
+ MessageProducer producer = session.createProducer(getTestQueue());
+ TextMessage sentMessage = session.createTextMessage(messageText);
+ sentMessage.setStringProperty("bar", "foo");
+
+ producer.send(sentMessage);
+ ((AMQSession) session).sync();
+ }
+
+ private void createAndBindQueue(final String virtualPath, final String testQueueName) throws IOException
+ {
+ // create the queue using REST and bind it
+ assertEquals(201,
+ _restTestHelper.submitRequest("/api/latest/queue"
+ + virtualPath
+ + virtualPath
+ + "/"
+ + testQueueName, "PUT", Collections.<String, Object>emptyMap()));
+ assertEquals(201,
+ _restTestHelper.submitRequest("/api/latest/binding"
+ + virtualPath
+ + virtualPath
+ + "/amq.direct/"
+ + testQueueName
+ + "/"
+ + testQueueName, "PUT", Collections.<String, Object>emptyMap()));
+ }
+
private String createMessageText()
{
StringBuilder stringBuilder = new StringBuilder();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org