You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/03/18 23:19:54 UTC

svn commit: r1458042 [2/6] - in /qpid/branches/QPID-4659/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/src/test/java/org/apache/qpid/ser...

Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,243 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;import java.util.Map;import org.apache.qpid.AMQPInvalidClassException;import org.apache.qpid.exchange.ExchangeDefaults;import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.exchange.Exchange;import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;import org.apache.qpid.transport.Header;import org.apache.qpid.transport.MessageDeliveryMode;import org.apache.qpid.transport.MessageProperties;import org.apache.qpid.transport.ReplyTo;
+
+public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTransferMessage, AMQMessage>
+{
+    private static final int BASIC_CLASS_ID = 60;public static BasicContentHeaderProperties convertContentHeaderProperties(MessageTransferMessage messageTransferMessage,
+                                                                              VirtualHost vhost)
+    {
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+        Header header = messageTransferMessage.getHeader();
+        DeliveryProperties deliveryProps = header.getDeliveryProperties();
+        MessageProperties messageProps = header.getMessageProperties();
+
+        if(deliveryProps != null)
+        {
+            if(deliveryProps.hasDeliveryMode())
+            {
+                props.setDeliveryMode((byte)(deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT));
+            }
+            if(deliveryProps.hasExpiration())
+            {
+                props.setExpiration(deliveryProps.getExpiration());
+            }
+            if(deliveryProps.hasPriority())
+            {
+                props.setPriority((byte)deliveryProps.getPriority().getValue());
+            }
+            if(deliveryProps.hasTimestamp())
+            {
+                props.setTimestamp(deliveryProps.getTimestamp());
+            }
+        }
+        if(messageProps != null)
+        {
+            if(messageProps.hasAppId())
+            {
+                props.setAppId(new AMQShortString(messageProps.getAppId()));
+            }
+            if(messageProps.hasContentType())
+            {
+                props.setContentType(messageProps.getContentType());
+            }
+            if(messageProps.hasCorrelationId())
+            {
+                props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
+            }
+            if(messageProps.hasContentEncoding())
+            {
+                props.setEncoding(messageProps.getContentEncoding());
+            }
+            if(messageProps.hasMessageId())
+            {
+                props.setMessageId("ID:" + messageProps.getMessageId().toString());
+            }
+            if(messageProps.hasReplyTo())
+            {
+                ReplyTo replyTo = messageProps.getReplyTo();
+                String exchangeName = replyTo.getExchange();
+                String routingKey = replyTo.getRoutingKey();
+                if(exchangeName == null)
+                {
+                    exchangeName = "";
+                }
+
+                Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+                String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString();
+                props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'"));
+
+            }
+            if(messageProps.hasUserId())
+            {
+                props.setUserId(new AMQShortString(messageProps.getUserId()));
+            }
+
+            if(messageProps.hasApplicationHeaders())
+            {
+                Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders());
+                if(messageProps.getApplicationHeaders().containsKey("x-jms-type"))
+                {
+                    props.setType(String.valueOf(appHeaders.remove("x-jms-type")));
+                }
+
+                FieldTable ft = new FieldTable();
+                for(Map.Entry<String, Object> entry : appHeaders.entrySet())
+                {
+                    try
+                    {
+                        ft.put(new AMQShortString(entry.getKey()), entry.getValue());
+                    }
+                    catch(AMQPInvalidClassException e)
+                    {
+                        // TODO
+                        // log here, but ignore - just can;t convert
+                    }
+                }
+                props.setHeaders(ft);
+
+            }
+        }
+
+        return props;
+    }
+
+    @Override
+    public Class<MessageTransferMessage> getInputClass()
+    {
+        return MessageTransferMessage.class;
+    }
+
+    @Override
+    public Class<AMQMessage> getOutputClass()
+    {
+        return AMQMessage.class;
+    }
+
+    @Override
+    public AMQMessage convert(MessageTransferMessage message, VirtualHost vhost)
+    {
+        return new AMQMessage(convertToStoredMessage(message, vhost));
+    }
+
+    private StoredMessage<MessageMetaData> convertToStoredMessage(final MessageTransferMessage message, VirtualHost vhost)
+    {
+        final MessageMetaData metaData = convertMetaData(message, vhost);
+        return new StoredMessage<org.apache.qpid.server.protocol.v0_8.MessageMetaData>()
+        {
+            @Override
+            public MessageMetaData getMetaData()
+            {
+                return metaData;
+            }
+
+            @Override
+            public long getMessageNumber()
+            {
+                return message.getMessageNumber();
+            }
+
+            @Override
+            public void addContent(int offsetInMessage, ByteBuffer src)
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getContent(int offsetInMessage, ByteBuffer dst)
+            {
+                return message.getContent(dst, offsetInMessage);
+            }
+
+            @Override
+            public ByteBuffer getContent(int offsetInMessage, int size)
+            {
+                return message.getContent(offsetInMessage,size);
+            }
+
+            @Override
+            public StoreFuture flushToStore()
+            {
+                return StoreFuture.IMMEDIATE_FUTURE;
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    private MessageMetaData convertMetaData(MessageTransferMessage message, VirtualHost vhost)
+    {
+        return new MessageMetaData(convertPublishBody(message), convertContentHeaderBody(message, vhost), 1, message.getArrivalTime());
+    }
+
+    private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHost vhost)
+    {
+        BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost);
+        ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+        chb.setBodySize(message.getSize());
+        return chb;
+    }
+
+    private MessagePublishInfo convertPublishBody(MessageTransferMessage message)
+    {
+        DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+        final AMQShortString exchangeName =(delvProps == null || delvProps.getExchange() == null)
+                ? null
+                : new AMQShortString(delvProps.getExchange());
+        final AMQShortString routingKey = (delvProps == null || delvProps.getRoutingKey() == null)
+                ? null
+                : new AMQShortString(delvProps.getRoutingKey());
+        final boolean immediate = delvProps != null && delvProps.getImmediate();
+        final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable();
+
+        return new MessagePublishInfo()
+        {
+            @Override
+            public AMQShortString getExchange()
+            {
+                return exchangeName;
+            }
+
+            @Override
+            public void setExchange(AMQShortString exchange)
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public boolean isImmediate()
+            {
+                return immediate;
+            }
+
+            @Override
+            public boolean isMandatory()
+            {
+                return mandatory;
+            }
+
+            @Override
+            public AMQShortString getRoutingKey()
+            {
+                return routingKey;
+            }
+        };
+    }
+}

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,200 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
+
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.url.AMQBindingURL;
+
+public class MessageConverter_0_8_to_0_10  implements MessageConverter<AMQMessage, MessageTransferMessage>
+{
+    @Override
+    public Class<AMQMessage> getInputClass()
+    {
+        return AMQMessage.class;
+    }
+
+    @Override
+    public Class<MessageTransferMessage> getOutputClass()
+    {
+        return MessageTransferMessage.class;
+    }
+
+    @Override
+    public MessageTransferMessage convert(AMQMessage message_0_8, VirtualHost vhost)
+    {
+        return new MessageTransferMessage(convertToStoredMessage(message_0_8), null);
+    }
+
+    private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final AMQMessage message_0_8)
+    {
+        final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(message_0_8);
+        return new StoredMessage<MessageMetaData_0_10>()
+        {
+            @Override
+            public MessageMetaData_0_10 getMetaData()
+            {
+                return messageMetaData_0_10;
+            }
+
+            @Override
+            public long getMessageNumber()
+            {
+                return message_0_8.getMessageNumber();
+            }
+
+            @Override
+            public void addContent(int offsetInMessage, ByteBuffer src)
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getContent(int offsetInMessage, ByteBuffer dst)
+            {
+                return message_0_8.getContent(dst, offsetInMessage);
+            }
+
+            @Override
+            public ByteBuffer getContent(int offsetInMessage, int size)
+            {
+                return message_0_8.getContent(offsetInMessage, size);
+            }
+
+            @Override
+            public StoreFuture flushToStore()
+            {
+                return StoreFuture.IMMEDIATE_FUTURE;
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    private MessageMetaData_0_10 convertMetaData(AMQMessage message_0_8)
+    {
+        DeliveryProperties deliveryProps = new DeliveryProperties();
+        MessageProperties messageProps = new MessageProperties();
+
+        int size = (int) message_0_8.getSize();
+        ByteBuffer body = ByteBuffer.allocate(size);
+        message_0_8.getContent(body, 0);
+        body.flip();
+
+        BasicContentHeaderProperties properties =
+                (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+
+        final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
+        if(exchange != null)
+        {
+            deliveryProps.setExchange(exchange.toString());
+        }
+
+        deliveryProps.setExpiration(message_0_8.getExpiration());
+        deliveryProps.setImmediate(message_0_8.isImmediate());
+        deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
+        deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
+        deliveryProps.setTimestamp(properties.getTimestamp());
+
+        messageProps.setContentEncoding(properties.getEncodingAsString());
+        messageProps.setContentLength(size);
+        if(properties.getAppId() != null)
+        {
+            messageProps.setAppId(properties.getAppId().getBytes());
+        }
+        messageProps.setContentType(properties.getContentTypeAsString());
+        if(properties.getCorrelationId() != null)
+        {
+            messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
+        }
+
+        if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+        {
+            String origReplyToString = properties.getReplyTo().asString();
+            ReplyTo replyTo = new ReplyTo();
+            // if the string looks like a binding URL, then attempt to parse it...
+            try
+            {
+                AMQBindingURL burl = new AMQBindingURL(origReplyToString);
+                AMQShortString routingKey = burl.getRoutingKey();
+                if(routingKey != null)
+                {
+                    replyTo.setRoutingKey(routingKey.asString());
+                }
+
+                AMQShortString exchangeName = burl.getExchangeName();
+                if(exchangeName != null)
+                {
+                    replyTo.setExchange(exchangeName.asString());
+                }
+            }
+            catch (URISyntaxException e)
+            {
+                replyTo.setRoutingKey(origReplyToString);
+            }
+            messageProps.setReplyTo(replyTo);
+
+        }
+
+        if(properties.getMessageId() != null)
+        {
+            try
+            {
+                String messageIdAsString = properties.getMessageIdAsString();
+                if(messageIdAsString.startsWith("ID:"))
+                {
+                    messageIdAsString = messageIdAsString.substring(3);
+                }
+                UUID uuid = UUID.fromString(messageIdAsString);
+                messageProps.setMessageId(uuid);
+            }
+            catch(IllegalArgumentException e)
+            {
+                // ignore - can't parse
+            }
+        }
+
+
+
+        if(properties.getUserId() != null)
+        {
+            messageProps.setUserId(properties.getUserId().getBytes());
+        }
+
+        FieldTable fieldTable = properties.getHeaders();
+
+        Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+        if(properties.getType() != null)
+        {
+            appHeaders.put("x-jms-type", properties.getTypeAsString());
+        }
+
+
+        messageProps.setApplicationHeaders(appHeaders);
+
+        Header header = new Header(deliveryProps, messageProps, null);
+
+
+        return new MessageMetaData_0_10(header, size, message_0_8.getArrivalTime());
+    }
+}

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,93 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
+import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
+
+public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
+{
+    @Override
+    public Class<AMQMessage> getInputClass()
+    {
+        return AMQMessage.class;
+    }
+
+    protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage, SectionEncoder sectionEncoder)
+    {
+
+        List<Section> sections = new ArrayList<Section>(3);
+
+        Header header = new Header();
+
+        header.setDurable(serverMessage.isPersistent());
+
+        BasicContentHeaderProperties contentHeader =
+                (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+
+        header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
+        final long expiration = serverMessage.getExpiration();
+        final long arrivalTime = serverMessage.getArrivalTime();
+
+        if(expiration > arrivalTime)
+        {
+            header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
+        }
+        sections.add(header);
+
+
+        Properties props = new Properties();
+
+        props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
+
+        props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
+
+        // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
+        if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
+        {
+            props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+        }
+
+        final AMQShortString correlationId = contentHeader.getCorrelationId();
+        if(correlationId != null)
+        {
+            props.setCorrelationId(new Binary(correlationId.getBytes()));
+        }
+        //        props.setCreationTime();
+        //        props.setGroupId();
+        //        props.setGroupSequence();
+        final AMQShortString messageId = contentHeader.getMessageId();
+        if(messageId != null)
+        {
+            props.setMessageId(new Binary(messageId.getBytes()));
+        }
+        props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
+
+        //        props.setReplyToGroupId();
+        props.setSubject(serverMessage.getRoutingKey());
+        //        props.setTo();
+        if(contentHeader.getUserId() != null)
+        {
+            props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
+        }
+        sections.add(props);
+
+        sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders())));
+
+        return new MessageMetaData_1_0(sections, sectionEncoder);
+    }
+
+}

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,27 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
+
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class MessageConverter_1_0_to_0_8 implements MessageConverter<Message_1_0, AMQMessage>
+{
+    @Override
+    public Class<Message_1_0> getInputClass()
+    {
+        return Message_1_0.class;
+    }
+
+    @Override
+    public Class<AMQMessage> getOutputClass()
+    {
+        return AMQMessage.class;
+    }
+
+    @Override
+    public AMQMessage convert(Message_1_0 message, VirtualHost vhost)
+    {
+        return null;  //TODO - Implement
+    }
+}

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Mon Mar 18 22:19:50 2013
@@ -18,10 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_10;
 
 
-public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
 {
     private volatile long _bytesCredit;
     private volatile long _messageCredit;
@@ -63,7 +63,7 @@ public class CreditCreditManager extends
     {
     }
 
-    
+
     public synchronized void addCredit(final long messageCredit, final long bytesCredit)
     {
         boolean notifyIncrease = true;

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Mon Mar 18 22:19:50 2013
@@ -18,12 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.transport.ServerSession;
 
 
 class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.flow.FlowCreditManager;
 
 public interface FlowCreditManager_0_10 extends FlowCreditManager
 {

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Mon Mar 18 22:19:50 2013
@@ -18,12 +18,12 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.protocol.v0_10.ServerSession;
 
 class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
 {

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Mon Mar 18 22:19:50 2013
@@ -19,10 +19,9 @@
  *
  */
 
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.transport.ServerSession;
 import org.apache.qpid.transport.Method;
 
 public class MessageAcceptCompletionListener implements Method.CompletionListener

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.store.StoredMessage;
+
+public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMetaData_0_10>
+{
+
+    @Override
+    public int ordinal()
+    {
+        return 1;
+    }
+
+    @Override
+    public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
+    {
+        return MessageMetaData_0_10.FACTORY.createMetaData(buf);
+    }
+
+    @Override
+    public ServerMessage<MessageMetaData_0_10> createMessage(StoredMessage<MessageMetaData_0_10> msg)
+    {
+        return new MessageTransferMessage(msg, null);
+    }
+
+    public int hashCode()
+    {
+        return ordinal();
+    }
+
+    public boolean equals(Object o)
+    {
+        return o != null && o.getClass() == getClass();
+    }
+}

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Mon Mar 18 22:19:50 2013
@@ -18,10 +18,12 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
@@ -38,6 +40,8 @@ import java.util.List;
 
 public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
 {
+    private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10();
+
     private Header _header;
     private DeliveryProperties _deliveryProps;
     private MessageProperties _messageProps;
@@ -58,7 +62,7 @@ public class MessageMetaData_0_10 implem
         this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis());
     }
 
-    private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
+    public MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
     {
         _header = header;
         if(_header != null)
@@ -81,7 +85,7 @@ public class MessageMetaData_0_10 implem
 
     public MessageMetaDataType getType()
     {
-        return MessageMetaDataType.META_DATA_0_10;
+        return TYPE;
     }
 
     public int getStorableSize()

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java Mon Mar 18 22:19:50 2013
@@ -18,9 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_10;
 
 import java.util.*;
+import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Mon Mar 18 22:19:50 2013
@@ -18,9 +18,13 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.transport.Header;
 

Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
+{
+
+    private static final byte[] AMQP_0_10_HEADER =
+            new byte[] { (byte) 'A',
+                         (byte) 'M',
+                         (byte) 'Q',
+                         (byte) 'P',
+                         (byte) 1,
+                         (byte) 1,
+                         (byte) 0,
+                         (byte) 10
+            };
+
+
+    public ProtocolEngineCreator_0_10()
+    {
+    }
+
+    public AmqpProtocolVersion getVersion()
+    {
+        return AmqpProtocolVersion.v0_10;
+    }
+
+
+    public byte[] getHeaderIdentifier()
+    {
+        return AMQP_0_10_HEADER;
+    }
+
+    public ServerProtocolEngine newProtocolEngine(Broker broker,
+                                                  NetworkConnection network,
+                                                  long id)
+    {
+        String fqdn = null;
+        SocketAddress address = network.getLocalAddress();
+        if (address instanceof InetSocketAddress)
+        {
+            fqdn = ((InetSocketAddress) address).getHostName();
+        }
+        final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker,
+                fqdn, broker.getSubjectCreator(address));
+
+        ServerConnection conn = new ServerConnection(id);
+
+        conn.setConnectionDelegate(connDelegate);
+        conn.setRemoteAddress(network.getRemoteAddress());
+        conn.setLocalAddress(network.getLocalAddress());
+        return new ProtocolEngine_0_10( conn, network);
+    }
+
+
+    private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_10();
+
+    public static ProtocolEngineCreator getInstance()
+    {
+        return INSTANCE;
+    }
+}

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Mon Mar 18 22:19:50 2013
@@ -18,11 +18,11 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.protocol.v0_10.ServerConnection;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.transport;
+package org.apache.qpid.server.protocol.v0_10;
 
 import java.net.SocketAddress;
 import java.security.Principal;
@@ -463,7 +463,13 @@ public class ServerConnection extends Co
 
     public String getPrincipalAsString()
     {
-        return getAuthorizedPrincipal().getName();
+        return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
+    }
+
+    @Override
+    public String getVirtualHostName()
+    {
+        return _virtualHost.getName();
     }
 
     public long getSessionCountLimit()

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.transport;
+package org.apache.qpid.server.protocol.v0_10;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -39,7 +39,7 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.State;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.transport;
+package org.apache.qpid.server.protocol.v0_10;
 
 import java.security.Principal;
 import java.text.MessageFormat;
@@ -51,9 +51,9 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
 import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -63,7 +63,7 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.DistributedTransaction;

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.transport;
+package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.log4j.Logger;
 
@@ -34,11 +34,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.HeadersExchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.flow.FlowCreditManager_0_10;
-import org.apache.qpid.server.flow.WindowCreditManager;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
-import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -50,8 +46,6 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.DtxNotSelectedException;
 import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -251,14 +245,15 @@ public class ServerSessionDelegate exten
                         return;
                     }
 
-                    Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
-                                                                  destination,
-                                                                  method.getAcceptMode(),
-                                                                  method.getAcquireMode(),
-                                                                  MessageFlowMode.WINDOW,
-                                                                  creditManager,
-                                                                  filterManager,
-                                                                  method.getArguments());
+                    Subscription_0_10 sub =
+                            new Subscription_0_10((ServerSession) session,
+                                    destination,
+                                    method.getAcceptMode(),
+                                    method.getAcquireMode(),
+                                    MessageFlowMode.WINDOW,
+                                    creditManager,
+                                    filterManager,
+                                    method.getArguments());
 
                     ((ServerSession)session).register(destination, sub);
                     try

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java Mon Mar 18 22:19:50 2013
@@ -18,33 +18,39 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_10;
 
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.flow.CreditCreditManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.FlowCreditManager_0_10;
-import org.apache.qpid.server.flow.WindowCreditManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.InboundMessageAdapter;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.transport.DeliveryProperties;
@@ -52,35 +58,16 @@ import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageFlowMode;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.ReplyTo;
 import org.apache.qpid.transport.Struct;
-import org.apache.qpid.url.AMQBindingURL;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
 
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.text.MessageFormat;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject
 {
     private final long _subscriptionID;
@@ -137,9 +124,9 @@ public class Subscription_0_10 implement
                              MessageAcquireMode acquireMode,
                              MessageFlowMode flowMode,
                              FlowCreditManager_0_10 creditManager,
-                             FilterManager filters,Map<String, Object> arguments, long subscriptionId)
+                             FilterManager filters,Map<String, Object> arguments)
     {
-        _subscriptionID = subscriptionId;
+        _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
         _session = session;
         _postIdSettingAction = new AddMessageDispositionListenerAction(session);
         _destination = destination;
@@ -201,7 +188,7 @@ public class Subscription_0_10 implement
     {
         return _destination;
     }
-    
+
     public boolean isSuspended()
     {
         return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
@@ -219,16 +206,26 @@ public class Subscription_0_10 implement
             return false;
         }
 
-        if (_noLocal && entry.getMessage() instanceof MessageTransferMessage)
+        if(entry.getMessage() instanceof MessageTransferMessage)
+        {
+            if (_noLocal)
+            {
+                Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
+                if (connectionRef != null && connectionRef == _session.getReference())
+                {
+                    return false;
+                }
+            }
+        }
+        else
         {
-            Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
-            if (connectionRef != null && connectionRef == _session.getReference())
+            // No interest in messages we can't convert into MessageTransferMessages
+            if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(),MessageTransferMessage.class)==null)
             {
                 return false;
             }
         }
 
-
         return checkFilters(entry);
 
 
@@ -357,201 +354,71 @@ public class Subscription_0_10 implement
 
         DeliveryProperties deliveryProps;
         MessageProperties messageProps = null;
+        MessageTransferMessage msg;
 
         if(serverMsg instanceof MessageTransferMessage)
         {
 
-            MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
-            DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
-            messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
-
-            deliveryProps = new DeliveryProperties();
-            if(origDeliveryProps != null)
-            {
-                if(origDeliveryProps.hasDeliveryMode())
-                {
-                    deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
-                }
-                if(origDeliveryProps.hasExchange())
-                {
-                    deliveryProps.setExchange(origDeliveryProps.getExchange());
-                }
-                if(origDeliveryProps.hasExpiration())
-                {
-                    deliveryProps.setExpiration(origDeliveryProps.getExpiration());
-                }
-                if(origDeliveryProps.hasPriority())
-                {
-                    deliveryProps.setPriority(origDeliveryProps.getPriority());
-                }
-                if(origDeliveryProps.hasRoutingKey())
-                {
-                    deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
-                }
-                if(origDeliveryProps.hasTimestamp())
-                {
-                    deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
-                }
-                if(origDeliveryProps.hasTtl())
-                {
-                    deliveryProps.setTtl(origDeliveryProps.getTtl());
-                }
-
-
-            }
-
-            deliveryProps.setRedelivered(entry.isRedelivered());
-
-            if(_trace != null && messageProps == null)
-            {
-                messageProps = new MessageProperties();
-            }
-
-            Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
-
-
-            xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
-                        : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+            msg = (MessageTransferMessage) serverMsg;
         }
-        else if(serverMsg instanceof AMQMessage)
+        else
         {
-            AMQMessage message_0_8 = (AMQMessage) serverMsg;
-            deliveryProps = new DeliveryProperties();
-            messageProps = new MessageProperties();
+            final MessageConverter converter =
+                    MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
+
+            msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost());
+        }
 
-            int size = (int) message_0_8.getSize();
-            ByteBuffer body = ByteBuffer.allocate(size);
-            message_0_8.getContent(body, 0);
-            body.flip();
+        DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+        messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
 
-            BasicContentHeaderProperties properties =
-                    (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
-            final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
-            if(exchange != null)
+        deliveryProps = new DeliveryProperties();
+        if(origDeliveryProps != null)
+        {
+            if(origDeliveryProps.hasDeliveryMode())
             {
-                deliveryProps.setExchange(exchange.toString());
+                deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
             }
-            deliveryProps.setExpiration(message_0_8.getExpiration());
-            deliveryProps.setImmediate(message_0_8.isImmediate());
-            deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
-            deliveryProps.setRedelivered(entry.isRedelivered());
-            deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
-            deliveryProps.setTimestamp(properties.getTimestamp());
-
-            messageProps.setContentEncoding(properties.getEncodingAsString());
-            messageProps.setContentLength(size);
-            if(properties.getAppId() != null)
+            if(origDeliveryProps.hasExchange())
             {
-                messageProps.setAppId(properties.getAppId().getBytes());
+                deliveryProps.setExchange(origDeliveryProps.getExchange());
             }
-            messageProps.setContentType(properties.getContentTypeAsString());
-            if(properties.getCorrelationId() != null)
+            if(origDeliveryProps.hasExpiration())
             {
-                messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
+                deliveryProps.setExpiration(origDeliveryProps.getExpiration());
             }
-
-            if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+            if(origDeliveryProps.hasPriority())
             {
-                String origReplyToString = properties.getReplyTo().asString();
-                ReplyTo replyTo = new ReplyTo();
-                // if the string looks like a binding URL, then attempt to parse it...
-                try
-                {
-                    AMQBindingURL burl = new AMQBindingURL(origReplyToString);
-                    AMQShortString routingKey = burl.getRoutingKey();
-                    if(routingKey != null)
-                    {
-                        replyTo.setRoutingKey(routingKey.asString());
-                    }
-
-                    AMQShortString exchangeName = burl.getExchangeName();
-                    if(exchangeName != null)
-                    {
-                        replyTo.setExchange(exchangeName.asString());
-                    }
-                }
-                catch (URISyntaxException e)
-                {
-                    replyTo.setRoutingKey(origReplyToString);
-                }
-                messageProps.setReplyTo(replyTo);
-
+                deliveryProps.setPriority(origDeliveryProps.getPriority());
             }
-
-            if(properties.getMessageId() != null)
+            if(origDeliveryProps.hasRoutingKey())
             {
-                try
-                {
-                    String messageIdAsString = properties.getMessageIdAsString();
-                    if(messageIdAsString.startsWith("ID:"))
-                    {
-                        messageIdAsString = messageIdAsString.substring(3);
-                    }
-                    UUID uuid = UUID.fromString(messageIdAsString);
-                    messageProps.setMessageId(uuid);
-                }
-                catch(IllegalArgumentException e)
-                {
-                    // ignore - can't parse
-                }
+                deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
             }
-
-
-
-            if(properties.getUserId() != null)
+            if(origDeliveryProps.hasTimestamp())
             {
-                messageProps.setUserId(properties.getUserId().getBytes());
+                deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
             }
-
-            FieldTable fieldTable = properties.getHeaders();
-
-            Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
-
-            if(properties.getType() != null)
+            if(origDeliveryProps.hasTtl())
             {
-                appHeaders.put("x-jms-type", properties.getTypeAsString());
+                deliveryProps.setTtl(origDeliveryProps.getTtl());
             }
 
 
-            messageProps.setApplicationHeaders(appHeaders);
-
-            Header header = new Header(deliveryProps, messageProps, null);
-            xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
-                        : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
         }
-        else
-        {
-
-            deliveryProps = new DeliveryProperties();
-            messageProps = new MessageProperties();
-
-            int size = (int) serverMsg.getSize();
-            ByteBuffer body = ByteBuffer.allocate(size);
-            serverMsg.getContent(body, 0);
-            body.flip();
 
+        deliveryProps.setRedelivered(entry.isRedelivered());
 
-            deliveryProps.setExpiration(serverMsg.getExpiration());
-            deliveryProps.setImmediate(serverMsg.isImmediate());
-            deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
-            deliveryProps.setRedelivered(entry.isRedelivered());
-            deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
-            deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+        if(_trace != null && messageProps == null)
+        {
+            messageProps = new MessageProperties();
+        }
 
-            messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
-            messageProps.setContentLength(size);
-            messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
-            if(serverMsg.getMessageHeader().getCorrelationId() != null)
-            {
-                messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
-            }
+        Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
 
-            // TODO - ReplyTo
 
-            Header header = new Header(deliveryProps, messageProps, null);
-            xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
-                        : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
-        }
+        xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+                    : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
 
         boolean excludeDueToFederation = false;
 

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.message.MessageReference;
 
 public class TransferMessageReference extends MessageReference<MessageTransferMessage>
 {

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Mon Mar 18 22:19:50 2013
@@ -18,9 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
 
 public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
 {

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server;
+package org.apache.qpid.server.protocol.v0_8;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -37,7 +37,6 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
@@ -51,9 +50,8 @@ import org.apache.qpid.framing.MethodReg
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.TransactionTimeoutHelper;
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
-import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.flow.FlowCreditManager;
@@ -65,19 +63,17 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.protocol.v0_8.ack.UnacknowledgedMessageMapImpl;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
@@ -85,7 +81,6 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Mon Mar 18 22:19:50 2013
@@ -18,21 +18,21 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_8;
 
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.StoredMessage;
 
-import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
-
 /**
  * A deliverable message.
  */
@@ -150,7 +150,7 @@ public class AMQMessage extends Abstract
 
     }
 
-    public MessagePublishInfo getMessagePublishInfo() throws AMQException
+    public MessagePublishInfo getMessagePublishInfo()
     {
         return getMessageMetaData().getMessagePublishInfo();
     }

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.message.MessageReference;
 
 public class AMQMessageReference extends MessageReference<AMQMessage>
 {

Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.protocol.v0_8;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -51,9 +51,8 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
@@ -62,16 +61,17 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.protocol.AMQNoMethodHandlerException;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.state.AMQState;
+import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
@@ -1157,6 +1157,12 @@ public class AMQProtocolEngine implement
         return getAuthId();
     }
 
+    @Override
+    public String getVirtualHostName()
+    {
+        return _virtualHost == null ? null : _virtualHost.getName();
+    }
+
     public long getSessionCountLimit()
     {
         return getMaximumNumberOfChannels();
@@ -1179,7 +1185,7 @@ public class AMQProtocolEngine implement
 
     public String getAuthId()
     {
-        return getAuthorizedPrincipal().getName();
+        return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
     }
 
     public Integer getRemotePID()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org