You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/12 14:34:54 UTC

svn commit: r1773795 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/plugin/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broke...

Author: rgodfrey
Date: Mon Dec 12 14:34:54 2016
New Revision: 1773795

URL: http://svn.apache.org/viewvc?rev=1773795&view=rev
Log:
QPID-7587 : Add support for enveloping messages from 0-x protocols using the 1.0 transport

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java
      - copied, changed from r1773722, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java
      - copied, changed from r1773722, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java   (with props)
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java   (with props)
Modified:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java (from r1773722, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java&r1=1773722&r2=1773795&rev=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFormat.java Mon Dec 12 14:34:54 2016
@@ -20,14 +20,17 @@
  */
 package org.apache.qpid.server.plugin;
 
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.store.MessageStore;
 
-public interface MessageConverter<M extends ServerMessage, N extends ServerMessage> extends Pluggable
+public interface MessageFormat<M extends ServerMessage<?>> extends Pluggable
 {
-    Class<M> getInputClass();
-    Class<N> getOutputClass();
-
-    N convert(M message, NamedAddressSpace addressSpace);
-    void dispose(N message);
+    int getSupportedFormat();
+    Class<M> getMessageClass();
+    List<QpidByteBuffer> convertToMessageFormat(M message);
+    M createMessage(List<QpidByteBuffer> buf, MessageStore store, final Object connectionReference);
+    String getRoutingAddress(M message, String destinationAddress);
 }

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java (from r1773722, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java&r1=1773722&r2=1773795&rev=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MessageFormatRegistry.java Mon Dec 12 14:34:54 2016
@@ -22,38 +22,40 @@
 package org.apache.qpid.server.protocol;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 
-public class MessageConverterRegistry
+public class MessageFormatRegistry
 {
-    private static Map<Class<? extends ServerMessage>, Map<Class<? extends ServerMessage>, MessageConverter>> _converters =
-            new HashMap<Class<? extends ServerMessage>, Map<Class<? extends ServerMessage>, MessageConverter>>();
+    private static Map<Integer, MessageFormat<? extends ServerMessage<?>>> INPUT_CONVERTERS =
+            new HashMap<>();
+
+    private static Map<Class<? extends ServerMessage<?>>, MessageFormat<? extends ServerMessage<?>>> OUTPUT_CONVERTERS =
+            new HashMap<>();
+
 
     static
     {
-
-        for(MessageConverter<? extends ServerMessage, ? extends ServerMessage> converter : (new QpidServiceLoader()).instancesOf(MessageConverter.class))
+        for(MessageFormat<? extends ServerMessage<?>> converter : (new QpidServiceLoader()).instancesOf(MessageFormat.class))
         {
-            Map<Class<? extends ServerMessage>, MessageConverter> map = _converters.get(converter.getInputClass());
-            if(map == null)
-            {
-                map = new HashMap<Class<? extends ServerMessage>, MessageConverter>();
-                _converters.put(converter.getInputClass(), map);
-            }
-            map.put(converter.getOutputClass(),converter);
+            INPUT_CONVERTERS.put(converter.getSupportedFormat(), converter);
+            OUTPUT_CONVERTERS.put(converter.getMessageClass(), converter);
         }
     }
 
-    public static <M  extends ServerMessage,N  extends ServerMessage> MessageConverter<M, N> getConverter(Class<M> from, Class<N> to)
+    public static MessageFormat<? extends ServerMessage<?>> getFormat(int format)
     {
-        Map<Class<? extends ServerMessage>, MessageConverter> map = _converters.get(from);
-        if(map == null)
-        {
-            map = _converters.get(ServerMessage.class);
-        }
-        return map == null ? null : map.get(to);
+        return INPUT_CONVERTERS.get(format);
     }
+
+    public  static MessageFormat<? extends ServerMessage<?>> getFormat(ServerMessage<?> message)
+    {
+        return OUTPUT_CONVERTERS.get(message.getClass());
+    }
+
 }

Added: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java?rev=1773795&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java Mon Dec 12 14:34:54 2016
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Struct;
+
+public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage>
+{
+
+    public static final int AMQP_MESSAGE_FORMAT_0_10 = 100;
+
+    @Override
+    public String getType()
+    {
+        return "AMQP_0_10";
+    }
+
+    @Override
+    public int getSupportedFormat()
+    {
+        return AMQP_MESSAGE_FORMAT_0_10;
+    }
+
+    @Override
+    public Class<MessageTransferMessage> getMessageClass()
+    {
+        return MessageTransferMessage.class;
+    }
+
+    // format: <int header count> <headers> <body>
+
+    @Override
+    public List<QpidByteBuffer> convertToMessageFormat(final MessageTransferMessage message)
+    {
+        ServerEncoder encoder = new ServerEncoder(4096, true);
+        Struct[] structs = message.getHeader().getStructs();
+        encoder.writeInt32(structs.length);
+        for(Struct struct : structs)
+        {
+            encoder.writeStruct32(struct);
+        }
+        final QpidByteBuffer headerBuf = encoder.getBuffer();
+        List<QpidByteBuffer> bufs = new ArrayList<>();
+        bufs.add(headerBuf);
+        bufs.addAll(message.getContent(0, (int) message.getSize()));
+
+        return bufs;
+    }
+
+    @Override
+    public MessageTransferMessage createMessage(final List<QpidByteBuffer> buf,
+                                                final MessageStore store,
+                                                final Object connectionReference)
+    {
+        try
+        {
+            ServerDecoder serverDecoder = new ServerDecoder(buf);
+            int headerCount = serverDecoder.readInt32();
+            DeliveryProperties deliveryProperties = null;
+            MessageProperties messageProperties = null;
+            List<Struct> nonStandard = null;
+            for(int i = 0; i<headerCount; i++)
+            {
+
+                final Struct struct = serverDecoder.readStruct32();
+                switch(struct.getStructType())
+                {
+                    case DeliveryProperties.TYPE:
+                        deliveryProperties = (DeliveryProperties)struct;
+                        break;
+                    case MessageProperties.TYPE:
+                        messageProperties = (MessageProperties)struct;
+                        break;
+                    default:
+                        if(nonStandard == null)
+                        {
+                            nonStandard = new ArrayList<>();
+                        }
+                        nonStandard.add(struct);
+                }
+            }
+            Header header = new Header(deliveryProperties, messageProperties, nonStandard);
+            int bodySize = 0;
+            for(QpidByteBuffer content : buf)
+            {
+                bodySize += content.remaining();
+            }
+            MessageMetaData_0_10 metaData = new MessageMetaData_0_10(header, bodySize, System.currentTimeMillis());
+            final MessageHandle<MessageMetaData_0_10> handle = store.addMessage(metaData);
+            for (QpidByteBuffer content : buf)
+            {
+                if (content.hasRemaining())
+                {
+                    handle.addContent(content);
+                }
+            }
+            final StoredMessage<MessageMetaData_0_10> storedMessage = handle.allContentAdded();
+            return new MessageTransferMessage(storedMessage, connectionReference);
+
+        }
+        catch (BufferUnderflowException e )
+        {
+            throw new ConnectionScopedRuntimeException("Error parsing AMQP 0-10 message format", e);
+        }
+    }
+
+
+    @Override
+    public String getRoutingAddress(final MessageTransferMessage message, final String destinationAddress)
+    {
+        String initialRoutingAddress = message.getInitialRoutingAddress();
+        if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
+        {
+            initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1);
+        }
+        return initialRoutingAddress;
+    }
+}

Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Dec 12 14:34:54 2016
@@ -1007,6 +1007,37 @@ public class ServerSessionDelegate exten
             {
                 destination = addressSpace.getDefaultDestination();
             }
+            else
+            {
+                Header header = xfr.getHeader();
+                DeliveryProperties delvProps;
+                if(header == null)
+                {
+                    delvProps = new DeliveryProperties();
+                    header = new Header(delvProps, null, null);
+                    xfr.setHeader(header);
+                }
+                else if(header.getDeliveryProperties() == null)
+                {
+                    delvProps = new DeliveryProperties();
+                    header = new Header(delvProps, header.getMessageProperties(), header.getNonStandardProperties());
+                    xfr.setHeader(header);
+                }
+                else
+                {
+                    delvProps = header.getDeliveryProperties();
+                }
+                if(delvProps.getExchange() == null && !xfr.getDestination().equals(delvProps.getRoutingKey()))
+                {
+                    delvProps.setExchange(xfr.getDestination());
+                }
+            }
+        }
+        else if(xfr.getHeader() != null
+                && xfr.getHeader().getDeliveryProperties() != null
+                && xfr.getHeader().getDeliveryProperties().getExchange() != null)
+        {
+            destination = addressSpace.getAttainedMessageDestination(xfr.getHeader().getDeliveryProperties().getExchange());
         }
         else
         {

Added: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java?rev=1773795&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java Mon Dec 12 14:34:54 2016
@@ -0,0 +1,259 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+public class MessageFormat_0_9_1 implements MessageFormat<AMQMessage>
+{
+
+    public static final int AMQP_MESSAGE_FORMAT_0_9_1 = 91;
+    private static final byte MANDATORY_MASK = (byte)1;
+    private static final byte IMMEDIATE_MASK = (byte)2;
+
+
+    @Override
+    public String getType()
+    {
+        return "AMQP_0_9_1";
+    }
+
+    @Override
+    public int getSupportedFormat()
+    {
+        return AMQP_MESSAGE_FORMAT_0_9_1;
+    }
+
+    @Override
+    public Class<AMQMessage> getMessageClass()
+    {
+        return AMQMessage.class;
+    }
+
+    @Override
+    public List<QpidByteBuffer> convertToMessageFormat(final AMQMessage message)
+    {
+        final MessagePublishInfo messagePublishInfo = message.getMessagePublishInfo();
+        final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+        AMQShortString exchange = messagePublishInfo.getExchange();
+        AMQShortString routingKey = messagePublishInfo.getRoutingKey();
+        int length =  contentHeaderBody.getSize() + (exchange == null ? 0 : exchange.length()) + (routingKey == null ? 0 : routingKey.length()) + 3;
+        QpidByteBuffer headerBuf = QpidByteBuffer.allocateDirect(length);
+        EncodingUtils.writeShortStringBytes(headerBuf, exchange);
+        EncodingUtils.writeShortStringBytes(headerBuf, routingKey);
+        byte flags = messagePublishInfo.isMandatory() ? (byte)0 : MANDATORY_MASK;
+        if(messagePublishInfo.isImmediate())
+        {
+            flags |= IMMEDIATE_MASK;
+        }
+        headerBuf.put(flags);
+        headerBuf.flip();
+
+        contentHeaderBody.writePayload(headerBuf);
+        List<QpidByteBuffer> bufs = new ArrayList<>();
+        headerBuf.flip();
+        bufs.add(headerBuf);
+        bufs.addAll(message.getContent(0, (int) contentHeaderBody.getBodySize()));
+
+        return bufs;
+    }
+
+    @Override
+    public AMQMessage createMessage(final List<QpidByteBuffer> buf,
+                                    final MessageStore store,
+                                    final Object connectionReference)
+    {
+        try
+        {
+            AMQShortString exchange = readShortString(buf);
+            AMQShortString routingKey = readShortString(buf);
+            byte flags = readByte(buf);
+            final MessagePublishInfo publishBody = new MessagePublishInfo(exchange,
+                                                                          (flags & IMMEDIATE_MASK) != 0,
+                                                                          (flags & MANDATORY_MASK) != 0,
+                                                                          routingKey);
+            final ContentHeaderBody contentHeaderBody = readContentBody(buf);
+            MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody);
+
+            final MessageHandle<MessageMetaData> handle = store.addMessage(mmd);
+            for (QpidByteBuffer content : buf)
+            {
+                if (content.hasRemaining())
+                {
+                    handle.addContent(content);
+                }
+            }
+            final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
+
+            return new AMQMessage(storedMessage, connectionReference);
+        }
+        catch (AMQFrameDecodingException | BufferUnderflowException e )
+        {
+            throw new ConnectionScopedRuntimeException("Error parsing AMQP 0-9-1 message format", e);
+        }
+    }
+
+    private ContentHeaderBody readContentBody(final List<QpidByteBuffer> buf) throws AMQFrameDecodingException
+    {
+        long size = ((long) readInt(buf)) & 0xffffffffL;
+        final QpidByteBuffer buffer = readByteBuffer(buf, size);
+        final ContentHeaderBody contentHeaderBody = new ContentHeaderBody(buffer, size);
+        buffer.dispose();
+        return contentHeaderBody;
+    }
+
+    private QpidByteBuffer readByteBuffer(final List<QpidByteBuffer> data, final long size)
+    {
+        QpidByteBuffer result = null;
+        for(QpidByteBuffer buf : data)
+        {
+            if(result == null && buf.remaining()>= size)
+            {
+                return buf.view(0, (int)size);
+            }
+            else if(buf.hasRemaining())
+            {
+                if(result == null)
+                {
+                    result = QpidByteBuffer.allocateDirect((int)size);
+                }
+                if(buf.remaining()>result.remaining())
+                {
+                    QpidByteBuffer dup = buf.view(0, result.remaining());
+                    result.put(dup);
+                    dup.dispose();
+                }
+                else
+                {
+                    result.put(buf);
+                }
+                if(!result.hasRemaining())
+                {
+                    result.flip();
+                    return result;
+                }
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    private int readInt(final List<QpidByteBuffer> data)
+    {
+        int required = 4;
+        int result = 0;
+        for(QpidByteBuffer buf : data)
+        {
+            if(required == 4 && buf.remaining() >= 4)
+            {
+                return buf.getInt();
+            }
+            else
+            {
+                while(buf.remaining() > 0)
+                {
+                    result <<= 8;
+                    result |= ((int)buf.get()) & 0xff;
+                    if(--required == 0)
+                    {
+                        return result;
+                    }
+                }
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    private byte readByte(final List<QpidByteBuffer> data)
+    {
+        for(QpidByteBuffer buf : data)
+        {
+            if(buf.hasRemaining())
+            {
+                return buf.get();
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    private AMQShortString readShortString(final List<QpidByteBuffer> data)
+    {
+        for(QpidByteBuffer buf : data)
+        {
+            if(buf.hasRemaining())
+            {
+                int length = ((int)buf.get(buf.position())) & 0xff;
+                if(buf.remaining()>length)
+                {
+                    return AMQShortString.readAMQShortString(buf);
+                }
+            }
+        }
+        int length = ((int)readByte(data)) & 0xff;
+        byte[] octets = new byte[length];
+        readByteArray(octets, data);
+        return new AMQShortString(octets);
+    }
+
+    private void readByteArray(final byte[] octets, final List<QpidByteBuffer> data)
+    {
+        int offset = 0;
+        for(QpidByteBuffer buf : data)
+        {
+            final int remaining = buf.remaining();
+            if(remaining >= octets.length-offset)
+            {
+                buf.get(octets, offset, octets.length-offset);
+                return;
+            }
+            else if(remaining > 0)
+            {
+                buf.get(octets, offset, remaining);
+                offset+=remaining;
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    @Override
+    public String getRoutingAddress(final AMQMessage message, final String destinationAddress)
+    {
+        String initialRoutingAddress = message.getInitialRoutingAddress();
+        if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/"))
+        {
+            initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1);
+        }
+        return initialRoutingAddress;
+    }
+}

Propchange: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageFormat_0_9_1.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Mon Dec 12 14:34:54 2016
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -76,7 +77,8 @@ public class ExchangeDestination impleme
         return OUTCOMES;
     }
 
-    public Outcome send(final Message_1_0 message,
+    public Outcome send(final ServerMessage<?> message,
+                        final String routingAddress,
                         ServerTransaction txn,
                         final Action<MessageInstance> action)
     {
@@ -103,7 +105,6 @@ public class ExchangeDestination impleme
                     return null;
                 }};
 
-        final String routingAddress = getRoutingAddress(message);
         int enqueues = _exchange.send(message,
                                       routingAddress,
                                       instanceProperties,
@@ -133,10 +134,11 @@ public class ExchangeDestination impleme
     }
 
     @Override
-    public void authorizePublish(final SecurityToken securityToken, final Message_1_0 message)
+    public void authorizePublish(final SecurityToken securityToken,
+                                 final String routingAddress)
     {
         _exchange.authorisePublish(securityToken,
-                            Collections.<String,Object>singletonMap("routingKey", getRoutingAddress(message)));
+                            Collections.<String,Object>singletonMap("routingKey", routingAddress));
 
 
     }
@@ -148,27 +150,38 @@ public class ExchangeDestination impleme
         MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
         if(_initialRoutingAddress == null)
         {
-            routingAddress = messageHeader.getSubject();
-            if(routingAddress == null)
+            final String to = messageHeader.getTo();
+            if (to != null
+                && (_exchange.getName() == null || _exchange.getName().trim().equals("")))
             {
-                if (messageHeader.getHeader("routing-key") instanceof String)
-                {
-                    routingAddress = (String) messageHeader.getHeader("routing-key");
-                }
-                else if (messageHeader.getHeader("routing_key") instanceof String)
-                {
-                    routingAddress = (String) messageHeader.getHeader("routing_key");
-                }
-                else if (messageHeader.getTo() != null
-                        && messageHeader.getTo().startsWith(_exchange.getName() + "/"))
-                {
-                    routingAddress = messageHeader.getTo().substring(1+_exchange.getName().length());
-                }
-                else
-                {
-                    routingAddress = "";
-                }
+                routingAddress = to;
             }
+            else if (to != null
+                     && to.startsWith(_exchange.getName() + "/"))
+            {
+                routingAddress = to.substring(1 + _exchange.getName().length());
+            }
+            else if (to != null && !to.equals(_exchange.getName()))
+            {
+                routingAddress = to;
+            }
+            else if (messageHeader.getHeader("routing-key") instanceof String)
+            {
+                routingAddress = (String) messageHeader.getHeader("routing-key");
+            }
+            else if (messageHeader.getHeader("routing_key") instanceof String)
+            {
+                routingAddress = (String) messageHeader.getHeader("routing_key");
+            }
+            else if (messageHeader.getSubject() != null)
+            {
+                routingAddress = messageHeader.getSubject();
+            }
+            else
+            {
+                routingAddress = "";
+            }
+
         }
         else
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Mon Dec 12 14:34:54 2016
@@ -28,6 +28,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -76,8 +77,8 @@ public class NodeReceivingDestination im
         return OUTCOMES;
     }
 
-    public Outcome send(final Message_1_0 message,
-                        ServerTransaction txn,
+    public Outcome send(final ServerMessage<?> message,
+                        final String routingAddress, ServerTransaction txn,
                         final Action<MessageInstance> action)
     {
         final InstanceProperties instanceProperties =
@@ -103,9 +104,6 @@ public class NodeReceivingDestination im
                     return null;
                 }};
 
-        String routingAddress;
-        routingAddress = getRoutingAddress(message);
-
         int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, action);
 
         if(enqueues == 0)
@@ -113,12 +111,11 @@ public class NodeReceivingDestination im
             _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
         }
 
-        return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(message) : ACCEPTED;
+        return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(routingAddress) : ACCEPTED;
     }
 
-    private Outcome createdRejectedOutcome(final Message_1_0 message)
+    private Outcome createdRejectedOutcome(String routingAddress)
     {
-        String routingAddress = getRoutingAddress(message);
         Rejected rejected = new Rejected();
         final Error notFoundError = new Error(AmqpError.NOT_FOUND, "Unknown destination '" + routingAddress + '"');
         rejected.setError(notFoundError);
@@ -132,10 +129,11 @@ public class NodeReceivingDestination im
     }
 
     @Override
-    public void authorizePublish(final SecurityToken securityToken, final Message_1_0 message)
+    public void authorizePublish(final SecurityToken securityToken,
+                                 final String routingAddress)
     {
             _destination.authorisePublish(securityToken,
-                                          Collections.<String, Object>singletonMap("routingKey", getRoutingAddress(message)));
+                                          Collections.<String, Object>singletonMap("routingKey", routingAddress));
 
     }
 
@@ -143,32 +141,39 @@ public class NodeReceivingDestination im
     public String getRoutingAddress(final Message_1_0 message)
     {
         MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
-        String routingAddress = messageHeader.getSubject();
-        if(routingAddress == null)
+        String routingAddress;
+        final String to = messageHeader.getTo();
+        if (to != null
+            && (_destination.getName() == null || _destination.getName().trim().equals("")))
         {
-            if (messageHeader.getHeader("routing-key") instanceof String)
-            {
-                routingAddress = (String) messageHeader.getHeader("routing-key");
-            }
-            else if (messageHeader.getHeader("routing_key") instanceof String)
-            {
-                routingAddress = (String) messageHeader.getHeader("routing_key");
-            }
-            else if (messageHeader.getTo() != null
-                     && messageHeader.getTo().startsWith(_destination.getName() + "/"))
-            {
-                routingAddress = messageHeader.getTo().substring(1+_destination.getName().length());
-            }
-            else if (messageHeader.getTo() != null
-                     && (_destination.getName() == null || _destination.getName().trim().equals("")))
-            {
-                routingAddress = messageHeader.getTo();
-            }
-            else
-            {
-                routingAddress = "";
-            }
+            routingAddress = to;
         }
+        else if (to != null
+                 && to.startsWith(_destination.getName() + "/"))
+        {
+            routingAddress = to.substring(1 + _destination.getName().length());
+        }
+        else if (to != null && !to.equals(_destination.getName()))
+        {
+            routingAddress = to;
+        }
+        else if (messageHeader.getHeader("routing-key") instanceof String)
+        {
+            routingAddress = (String) messageHeader.getHeader("routing-key");
+        }
+        else if (messageHeader.getHeader("routing_key") instanceof String)
+        {
+            routingAddress = (String) messageHeader.getHeader("routing_key");
+        }
+        else if (messageHeader.getSubject() != null)
+        {
+            routingAddress = messageHeader.getSubject();
+        }
+        else
+        {
+            routingAddress = "";
+        }
+
         return routingAddress;
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Mon Dec 12 14:34:54 2016
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -56,8 +57,8 @@ public class QueueDestination extends Me
         return OUTCOMES;
     }
 
-    public Outcome send(final Message_1_0 message,
-                        ServerTransaction txn,
+    public Outcome send(final ServerMessage<?> message,
+                        final String routingAddress, ServerTransaction txn,
                         final Action<MessageInstance> action)
     {
 
@@ -106,11 +107,12 @@ public class QueueDestination extends Me
     }
 
     @Override
-    public void authorizePublish(final SecurityToken securityToken, final Message_1_0 message)
+    public void authorizePublish(final SecurityToken securityToken,
+                                 final String routingAddress)
     {
 
         _queue.authorisePublish(securityToken,
-                                Collections.<String,Object>singletonMap("routingKey", getRoutingAddress(message)));
+                                Collections.<String,Object>emptyMap());
 
 
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java Mon Dec 12 14:34:54 2016
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -38,7 +39,10 @@ public interface ReceivingDestination ex
 
     Outcome[] getOutcomes();
 
-    Outcome send(Message_1_0 message, ServerTransaction txn, final Action<MessageInstance> postEnqueueAction);
+    Outcome send(ServerMessage<?> message,
+                 final String routingAddress,
+                 ServerTransaction txn,
+                 final Action<MessageInstance> postEnqueueAction);
 
     int getCredit();
 
@@ -46,5 +50,5 @@ public interface ReceivingDestination ex
 
     String getAddress();
 
-    void authorizePublish(SecurityToken securityToken, final Message_1_0 message);
+    void authorizePublish(SecurityToken securityToken, final String routingAddress);
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Mon Dec 12 14:34:54 2016
@@ -34,7 +34,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.MessageFormat;
+import org.apache.qpid.server.protocol.MessageFormatRegistry;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -106,10 +109,10 @@ public class StandardReceivingLink_1_0 i
 
         org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
         final Binary deliveryTag = xfr.getDeliveryTag();
-
+        UnsignedInteger messageFormat = null;
         if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
         {
-            _incompleteMessage = new ArrayList<Transfer>();
+            _incompleteMessage = new ArrayList<>();
             _incompleteMessage.add(xfr);
             _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
             _messageDeliveryTag = deliveryTag;
@@ -128,6 +131,10 @@ public class StandardReceivingLink_1_0 i
 
             for(Transfer t : _incompleteMessage)
             {
+                if(t.getMessageFormat() != null && messageFormat == null)
+                {
+                    messageFormat = t.getMessageFormat();
+                }
                 fragments.addAll(t.getPayload());
                 t.dispose();
             }
@@ -139,6 +146,8 @@ public class StandardReceivingLink_1_0 i
             _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
             _messageDeliveryTag = deliveryTag;
             fragments = xfr.getPayload();
+            messageFormat = xfr.getMessageFormat();
+
             xfr.dispose();
         }
 
@@ -161,21 +170,45 @@ public class StandardReceivingLink_1_0 i
         }
         else
         {
-            List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
-            MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
-            MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
+            ServerMessage<?> serverMessage;
+            String routingAddress;
 
-            for(EncodingRetainingSection<?> dataSection : dataSections)
+            if(messageFormat == null || UnsignedInteger.ZERO.equals(messageFormat))
             {
-                for (QpidByteBuffer buf : dataSection.getEncodedForm())
+                List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
+
+                MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
+                MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
+
+                for (EncodingRetainingSection<?> dataSection : dataSections)
                 {
-                    handle.addContent(buf);
-                    buf.dispose();
+                    for (QpidByteBuffer buf : dataSection.getEncodedForm())
+                    {
+                        handle.addContent(buf);
+                        buf.dispose();
+                    }
+                }
+                final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
+                Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
+                routingAddress = _destination.getRoutingAddress(message);
+                serverMessage = message;
+            }
+            else
+            {
+                MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
+                if(format != null)
+                {
+                    serverMessage = format.createMessage(fragments, _addressSpace.getMessageStore(), getSession().getConnection().getReference());
+                    routingAddress = format.getRoutingAddress(serverMessage, _destination.getAddress());
+                }
+                else
+                {
+                    final Error err = new Error();
+                    err.setCondition(AmqpError.NOT_IMPLEMENTED);
+                    err.setDescription("Unknown message format: " + messageFormat);
+                    return err;
                 }
             }
-            final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
-            Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
-
 
             for(QpidByteBuffer fragment: fragments)
             {
@@ -183,7 +216,7 @@ public class StandardReceivingLink_1_0 i
             }
             fragments = null;
 
-            MessageReference<Message_1_0> reference = message.newReference();
+            MessageReference<?> reference = serverMessage.newReference();
             try
             {
                 Binary transactionId = null;
@@ -213,10 +246,10 @@ public class StandardReceivingLink_1_0 i
                     Session_1_0 session = getSession();
 
                     session.getAMQPConnection()
-                            .checkAuthorizedMessagePrincipal(message.getMessageHeader().getUserId());
-                    _destination.authorizePublish(session.getSecurityToken(), message);
+                            .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
+                    _destination.authorizePublish(session.getSecurityToken(), routingAddress);
 
-                    Outcome outcome = _destination.send(message, transaction, session.getCapacityCheckAction());
+                    Outcome outcome = _destination.send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
                     Source source = (Source) getEndpoint().getSource();
 
                     DeliveryState resultantState;
@@ -261,7 +294,7 @@ public class StandardReceivingLink_1_0 i
                     getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
 
                     getSession().getAMQPConnection()
-                            .registerMessageReceived(message.getSize(), message.getArrivalTime());
+                            .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
 
                     if (!(transaction instanceof AutoCommitTransaction))
                     {

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Mon Dec 12 14:34:54 2016
@@ -40,16 +40,13 @@ public class ContentHeaderPropertiesFact
              throws AMQFrameDecodingException
     {
         BasicContentHeaderProperties properties;
-        // AMQP version change: "Hardwired" version to major=8, minor=0
-        // TODO: Change so that the actual version is obtained from
-        // the ProtocolInitiation object for this session.
         if (classId == BasicConsumeBody.CLASS_ID)
         {
         	properties = new BasicContentHeaderProperties();
         }
         else
         {
-        	throw new AMQFrameDecodingException("Unsupport content header class id: " + classId, null);
+        	throw new AMQFrameDecodingException("Unsupported content header class id: " + classId, null);
         }
         properties.populatePropertiesFromBuffer(buffer, propertyFlags, size);
         return properties;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java?rev=1773795&r1=1773794&r2=1773795&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/MessagePublishInfo.java Mon Dec 12 14:34:54 2016
@@ -55,9 +55,9 @@ public final class MessagePublishInfo
         return _immediate;
     }
 
-    public void setImmediate(boolean immedate)
+    public void setImmediate(boolean immediate)
     {
-        _immediate = immedate;
+        _immediate = immediate;
     }
 
     public boolean isMandatory()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org