You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/09 18:40:37 UTC

svn commit: r1566328 [2/3] - in /qpid/branches/java-broker-amqp-1-0-management/java: ./ broker-core/src/main/java/org/apache/qpid/server/message/internal/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid...

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+public class MessageConverter_v0_10_to_Internal implements MessageConverter<MessageTransferMessage, InternalMessage>
+{
+    @Override
+    public Class<MessageTransferMessage> getInputClass()
+    {
+        return MessageTransferMessage.class;
+    }
+
+    @Override
+    public Class<InternalMessage> getOutputClass()
+    {
+        return InternalMessage.class;
+    }
+
+    @Override
+    public InternalMessage convert(MessageTransferMessage serverMessage, VirtualHost vhost)
+    {
+        final String mimeType = serverMessage.getMessageHeader().getMimeType();
+        byte[] data = new byte[(int) serverMessage.getSize()];
+        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+        Object body = convertMessageBody(mimeType, data);
+        MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
+        AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo());
+        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body);
+    }
+
+    private static class DelegatingMessageHeader implements AMQMessageHeader
+    {
+        private final AMQMessageHeader _delegate;
+        private final ReplyTo _replyTo;
+
+
+        private DelegatingMessageHeader(final AMQMessageHeader delegate, final ReplyTo replyTo)
+        {
+            _delegate = delegate;
+            _replyTo = replyTo;
+        }
+
+        @Override
+        public String getCorrelationId()
+        {
+            return _delegate.getCorrelationId();
+        }
+
+        @Override
+        public long getExpiration()
+        {
+            return _delegate.getExpiration();
+        }
+
+        @Override
+        public String getUserId()
+        {
+            return _delegate.getUserId();
+        }
+
+        @Override
+        public String getAppId()
+        {
+            return _delegate.getAppId();
+        }
+
+        @Override
+        public String getMessageId()
+        {
+            return _delegate.getMessageId();
+        }
+
+        @Override
+        public String getMimeType()
+        {
+            return _delegate.getMimeType();
+        }
+
+        @Override
+        public String getEncoding()
+        {
+            return _delegate.getEncoding();
+        }
+
+        @Override
+        public byte getPriority()
+        {
+            return _delegate.getPriority();
+        }
+
+        @Override
+        public long getTimestamp()
+        {
+            return _delegate.getTimestamp();
+        }
+
+        @Override
+        public String getType()
+        {
+            return _delegate.getType();
+        }
+
+        @Override
+        public String getReplyTo()
+        {
+            return _replyTo == null
+                    ? null
+                    : _replyTo.getExchange() == null || _replyTo.getExchange().equals("")
+                        ? _replyTo.getRoutingKey()
+                        : _replyTo.getRoutingKey() == null || _replyTo.getRoutingKey().equals("")
+                            ? _replyTo.getExchange()
+                            : _replyTo.getExchange() + "/" + _replyTo.getRoutingKey();
+        }
+
+        @Override
+        public Object getHeader(final String name)
+        {
+            return _delegate.getHeader(name);
+        }
+
+        @Override
+        public boolean containsHeaders(final Set<String> names)
+        {
+            return _delegate.containsHeaders(names);
+        }
+
+        @Override
+        public boolean containsHeader(final String name)
+        {
+            return _delegate.containsHeader(name);
+        }
+
+        @Override
+        public Collection<String> getHeaderNames()
+        {
+            return _delegate.getHeaderNames();
+        }
+    }
+
+    private static Object convertMessageBody(String mimeType, byte[] data)
+    {
+        if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+        {
+            String text = new String(data);
+            return text;
+        }
+        else if("jms/map-message".equals(mimeType))
+        {
+            TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+            LinkedHashMap map = new LinkedHashMap();
+            final int entries = reader.readIntImpl();
+            for (int i = 0; i < entries; i++)
+            {
+                try
+                {
+                    String propName = reader.readStringImpl();
+                    Object value = reader.readObject();
+
+                    map.put(propName, value);
+                }
+                catch (EOFException e)
+                {
+                    throw new IllegalArgumentException(e);
+                }
+                catch (TypedBytesFormatException e)
+                {
+                    throw new IllegalArgumentException(e);
+                }
+
+            }
+
+            return map;
+
+        }
+        else if("amqp/map".equals(mimeType))
+        {
+            BBDecoder decoder = new BBDecoder();
+            decoder.init(ByteBuffer.wrap(data));
+            final Map<String,Object> map = decoder.readMap();
+
+            return map;
+
+        }
+        else if("amqp/list".equals(mimeType))
+        {
+            BBDecoder decoder = new BBDecoder();
+            decoder.init(ByteBuffer.wrap(data));
+            return decoder.readList();
+        }
+        else if("jms/stream-message".equals(mimeType))
+        {
+            TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+            List list = new ArrayList();
+            while (reader.remaining() != 0)
+            {
+                try
+                {
+                    list.add(reader.readObject());
+                }
+                catch (TypedBytesFormatException e)
+                {
+                    throw new RuntimeException(e);  // TODO - Implement
+                }
+                catch (EOFException e)
+                {
+                    throw new RuntimeException(e);  // TODO - Implement
+                }
+            }
+            return list;
+        }
+        else
+        {
+            return data;
+
+        }
+    }
+
+    @Override
+    public String getType()
+    {
+        return "v0-10 to Internal";
+    }
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter Sun Feb  9 17:40:35 2014
@@ -17,3 +17,5 @@
 # under the License.
 #
 org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10
+org.apache.qpid.server.protocol.v0_10.MessageConverter_Internal_to_v0_10
+org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10_to_Internal

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBEncoder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_Internal_to_v0_8 implements MessageConverter<InternalMessage, AMQMessage>
+{
+    private static final int BASIC_CLASS_ID = 60;
+    private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
+    public Class<InternalMessage> getInputClass()
+    {
+        return InternalMessage.class;
+    }
+
+    @Override
+    public Class<AMQMessage> getOutputClass()
+    {
+        return AMQMessage.class;
+    }
+
+    @Override
+    public AMQMessage convert(InternalMessage serverMsg, VirtualHost vhost)
+    {
+        return new AMQMessage(convertToStoredMessage(serverMsg), null);
+    }
+
+    private StoredMessage<MessageMetaData> convertToStoredMessage(final InternalMessage serverMsg)
+    {
+        final byte[] messageContent = convertToBody(serverMsg.getMessageBody());
+        final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg,
+                                                                    getBodyMimeType(serverMsg.getMessageBody()),
+                                                                    messageContent.length);
+
+        return new StoredMessage<MessageMetaData>()
+        {
+            @Override
+            public MessageMetaData getMetaData()
+            {
+                return messageMetaData_0_8;
+            }
+
+            @Override
+            public long getMessageNumber()
+            {
+                return serverMsg.getMessageNumber();
+            }
+
+            @Override
+            public void addContent(int offsetInMessage, ByteBuffer src)
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getContent(int offsetInMessage, ByteBuffer dst)
+            {
+                int size = messageContent.length - offsetInMessage;
+                if(dst.remaining() < size)
+                {
+                    size = dst.remaining();
+                }
+                ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
+                dst.put(buf);
+                return size;
+            }
+
+            @Override
+            public ByteBuffer getContent(int offsetInMessage, int size)
+            {
+                return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+            }
+
+            @Override
+            public StoreFuture flushToStore()
+            {
+                return StoreFuture.IMMEDIATE_FUTURE;
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    private MessageMetaData convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
+    {
+
+        MessagePublishInfo publishInfo = new MessagePublishInfo()
+                                            {
+                                                @Override
+                                                public AMQShortString getExchange()
+                                                {
+                                                    return null;
+                                                }
+
+                                                @Override
+                                                public void setExchange(final AMQShortString amqShortString)
+                                                {
+                                                    throw new UnsupportedOperationException();
+                                                }
+
+                                                @Override
+                                                public boolean isImmediate()
+                                                {
+                                                    return false;
+                                                }
+
+                                                @Override
+                                                public boolean isMandatory()
+                                                {
+                                                    return false;
+                                                }
+
+                                                @Override
+                                                public AMQShortString getRoutingKey()
+                                                {
+                                                    return null;
+                                                }
+                                            };
+
+
+        final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+        props.setAppId(serverMsg.getMessageHeader().getAppId());
+        props.setContentType(bodyMimeType);
+        props.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId());
+        props.setDeliveryMode(serverMsg.isPersistent() ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT);
+        props.setExpiration(serverMsg.getExpiration());
+        props.setMessageId(serverMsg.getMessageHeader().getMessageId());
+        props.setPriority(serverMsg.getMessageHeader().getPriority());
+        props.setReplyTo(serverMsg.getMessageHeader().getReplyTo());
+        props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+        props.setUserId(serverMsg.getMessageHeader().getUserId());
+
+        Map<String,Object> headerProps = new LinkedHashMap<String, Object>();
+
+        for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
+        {
+            headerProps.put(headerName, serverMsg.getMessageHeader().getHeader(headerName));
+        }
+
+        props.setHeaders(FieldTable.convertToFieldTable(headerProps));
+
+        final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+        return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
+    }
+
+
+    @Override
+    public String getType()
+    {
+        return "Internal to v0-8";
+    }
+
+
+    public static byte[] convertToBody(Object object)
+    {
+        if(object instanceof String)
+        {
+            return ((String)object).getBytes(UTF_8);
+        }
+        else if(object instanceof byte[])
+        {
+            return (byte[]) object;
+        }
+        else if(object instanceof Map)
+        {
+            BBEncoder encoder = new BBEncoder(1024);
+            encoder.writeMap((Map)object);
+            ByteBuffer buf = encoder.segment();
+            int remaining = buf.remaining();
+            byte[] data = new byte[remaining];
+            buf.get(data);
+            return data;
+
+        }
+        else if(object instanceof List)
+        {
+            BBEncoder encoder = new BBEncoder(1024);
+            encoder.writeList((List) object);
+            ByteBuffer buf = encoder.segment();
+            int remaining = buf.remaining();
+            byte[] data = new byte[remaining];
+            buf.get(data);
+            return data;
+        }
+        else
+        {
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+            try
+            {
+                ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+                os.writeObject(object);
+                return bytesOut.toByteArray();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static String getBodyMimeType(Object object)
+    {
+        if(object instanceof String)
+        {
+            return "text/plain";
+        }
+        else if(object instanceof byte[])
+        {
+            return "application/octet-stream";
+        }
+        else if(object instanceof Map)
+        {
+            return "amqp/map";
+        }
+        else if(object instanceof List)
+        {
+            return "amqp/list";
+        }
+        else
+        {
+            return "application/java-object-stream";
+        }
+    }
+
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMessage, InternalMessage>
+{
+    @Override
+    public Class<AMQMessage> getInputClass()
+    {
+        return AMQMessage.class;
+    }
+
+    @Override
+    public Class<InternalMessage> getOutputClass()
+    {
+        return InternalMessage.class;
+    }
+
+    @Override
+    public InternalMessage convert(AMQMessage serverMessage, VirtualHost vhost)
+    {
+        final String mimeType = serverMessage.getMessageHeader().getMimeType();
+        byte[] data = new byte[(int) serverMessage.getSize()];
+        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+        Object body = convertMessageBody(mimeType, data);
+
+        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), body);
+    }
+
+    private static Object convertMessageBody(String mimeType, byte[] data)
+    {
+        if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+        {
+            String text = new String(data);
+            return text;
+        }
+        else if("jms/map-message".equals(mimeType))
+        {
+            TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+            LinkedHashMap map = new LinkedHashMap();
+            final int entries = reader.readIntImpl();
+            for (int i = 0; i < entries; i++)
+            {
+                try
+                {
+                    String propName = reader.readStringImpl();
+                    Object value = reader.readObject();
+
+                    map.put(propName, value);
+                }
+                catch (EOFException e)
+                {
+                    throw new IllegalArgumentException(e);
+                }
+                catch (TypedBytesFormatException e)
+                {
+                    throw new IllegalArgumentException(e);
+                }
+
+            }
+
+            return map;
+
+        }
+        else if("amqp/map".equals(mimeType))
+        {
+            BBDecoder decoder = new BBDecoder();
+            decoder.init(ByteBuffer.wrap(data));
+            final Map<String,Object> map = decoder.readMap();
+
+            return map;
+
+        }
+        else if("amqp/list".equals(mimeType))
+        {
+            BBDecoder decoder = new BBDecoder();
+            decoder.init(ByteBuffer.wrap(data));
+            return decoder.readList();
+        }
+        else if("jms/stream-message".equals(mimeType))
+        {
+            TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+            List list = new ArrayList();
+            while (reader.remaining() != 0)
+            {
+                try
+                {
+                    list.add(reader.readObject());
+                }
+                catch (TypedBytesFormatException e)
+                {
+                    throw new RuntimeException(e);  // TODO - Implement
+                }
+                catch (EOFException e)
+                {
+                    throw new RuntimeException(e);  // TODO - Implement
+                }
+            }
+            return list;
+        }
+        else
+        {
+            return data;
+
+        }
+    }
+
+    @Override
+    public String getType()
+    {
+        return "v0-8 to Internal";
+    }
+}

Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (from r1564820, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter&r1=1564820&r2=1566328&rev=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter Sun Feb  9 17:40:35 2014
@@ -16,4 +16,5 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0
+org.apache.qpid.server.protocol.v0_8.MessageConverter_Internal_to_v0_8
+org.apache.qpid.server.protocol.v0_8.MessageConverter_v0_8_to_Internal
\ No newline at end of file

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.server.message.internal.InternalMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<InternalMessage>
+{
+    private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
+    public Class<InternalMessage> getInputClass()
+    {
+        return InternalMessage.class;
+    }
+
+
+    @Override
+    protected MessageMetaData_1_0 convertMetaData(final InternalMessage serverMessage,
+                                                  final SectionEncoder sectionEncoder)
+    {
+        List<Section> sections = new ArrayList<Section>(3);
+        Header header = new Header();
+
+        header.setDurable(serverMessage.isPersistent());
+        header.setPriority(UnsignedByte.valueOf(serverMessage.getMessageHeader().getPriority()));
+        if(serverMessage.getExpiration() != 0l && serverMessage.getArrivalTime() !=0l && serverMessage.getExpiration() >= serverMessage.getArrivalTime())
+        {
+            header.setTtl(UnsignedInteger.valueOf(serverMessage.getExpiration()-serverMessage.getArrivalTime()));
+        }
+
+        sections.add(header);
+
+        Properties properties = new Properties();
+        properties.setCorrelationId(serverMessage.getMessageHeader().getCorrelationId());
+        properties.setCreationTime(new Date(serverMessage.getMessageHeader().getTimestamp()));
+        properties.setMessageId(serverMessage.getMessageHeader().getMessageId());
+        final String userId = serverMessage.getMessageHeader().getUserId();
+        if(userId != null)
+        {
+            properties.setUserId(new Binary(userId.getBytes(UTF_8)));
+        }
+        properties.setReplyTo(serverMessage.getMessageHeader().getReplyTo());
+
+        sections.add(properties);
+
+        if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty())
+        {
+            ApplicationProperties applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() );
+            sections.add(applicationProperties);
+        }
+        return new MessageMetaData_1_0(sections, sectionEncoder);
+
+    }
+
+    protected Section getBodySection(final InternalMessage serverMessage, final String mimeType)
+    {
+        return convertToBody(serverMessage.getMessageBody());
+    }
+
+
+    @Override
+    public String getType()
+    {
+        return "Internal to v1-0";
+    }
+
+
+    public Section convertToBody(Object object)
+    {
+        if(object instanceof String)
+        {
+            return new AmqpValue(object);
+        }
+        else if(object instanceof byte[])
+        {
+            return new Data(new Binary((byte[])object));
+        }
+        else if(object instanceof Map)
+        {
+            return new AmqpValue(MessageConverter_to_1_0.fixMapValues((Map)object));
+        }
+        else if(object instanceof List)
+        {
+            return new AmqpValue(MessageConverter_to_1_0.fixListValues((List)object));
+        }
+        else
+        {
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+            try
+            {
+                ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+                os.writeObject(object);
+                return new Data(new Binary(bytesOut.toByteArray()));
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Sun Feb  9 17:40:35 2014
@@ -156,7 +156,7 @@ public abstract class MessageConverter_t
         }
     }
 
-    private static Map fixMapValues(final Map<String, Object> map)
+    static Map fixMapValues(final Map<String, Object> map)
     {
         for(Map.Entry<String,Object> entry : map.entrySet())
         {
@@ -165,7 +165,7 @@ public abstract class MessageConverter_t
         return map;
     }
 
-    private static Object fixValue(final Object value)
+    static Object fixValue(final Object value)
     {
         if(value instanceof byte[])
         {
@@ -185,7 +185,7 @@ public abstract class MessageConverter_t
         }
     }
 
-    private static List fixListValues(final List list)
+    static List fixListValues(final List list)
     {
         ListIterator iterator = list.listIterator();
         while(iterator.hasNext())
@@ -198,83 +198,88 @@ public abstract class MessageConverter_t
     }
 
     private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
-                                                                      final ServerMessage serverMessage,
+                                                                      final M serverMessage,
                                                                       SectionEncoder sectionEncoder)
     {
-            final String mimeType = serverMessage.getMessageHeader().getMimeType();
-            byte[] data = new byte[(int) serverMessage.getSize()];
-            serverMessage.getContent(ByteBuffer.wrap(data), 0);
-
-            Section bodySection = convertMessageBody(mimeType, data);
-
-            final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
-
-            return new StoredMessage<MessageMetaData_1_0>()
-            {
-                @Override
-                public MessageMetaData_1_0 getMetaData()
-                {
-                    return metaData;
-                }
-
-                @Override
-                public long getMessageNumber()
-                {
-                    return serverMessage.getMessageNumber();
-                }
-
-                @Override
-                public void addContent(int offsetInMessage, ByteBuffer src)
-                {
-                    throw new UnsupportedOperationException();
-                }
-
-                @Override
-                public int getContent(int offsetInMessage, ByteBuffer dst)
-                {
-                    ByteBuffer buf = allData.duplicate();
-                    buf.position(offsetInMessage);
-                    buf = buf.slice();
-                    int size;
-                    if(dst.remaining()<buf.remaining())
-                    {
-                        buf.limit(dst.remaining());
-                        size = dst.remaining();
-                    }
-                    else
-                    {
-                        size = buf.remaining();
-                    }
-                    dst.put(buf);
-                    return size;
-                }
-
-                @Override
-                public ByteBuffer getContent(int offsetInMessage, int size)
-                {
-                    ByteBuffer buf = allData.duplicate();
-                    buf.position(offsetInMessage);
-                    buf = buf.slice();
-                    if(size < buf.remaining())
+        final String mimeType = serverMessage.getMessageHeader().getMimeType();
+        Section bodySection = getBodySection(serverMessage, mimeType);
+
+        final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
+
+        return new StoredMessage<MessageMetaData_1_0>()
                     {
-                        buf.limit(size);
-                    }
-                    return buf;
-                }
-
-                @Override
-                public StoreFuture flushToStore()
-                {
-                    throw new UnsupportedOperationException();
-                }
-
-                @Override
-                public void remove()
-                {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
+                        @Override
+                        public MessageMetaData_1_0 getMetaData()
+                        {
+                            return metaData;
+                        }
+
+                        @Override
+                        public long getMessageNumber()
+                        {
+                            return serverMessage.getMessageNumber();
+                        }
+
+                        @Override
+                        public void addContent(int offsetInMessage, ByteBuffer src)
+                        {
+                            throw new UnsupportedOperationException();
+                        }
+
+                        @Override
+                        public int getContent(int offsetInMessage, ByteBuffer dst)
+                        {
+                            ByteBuffer buf = allData.duplicate();
+                            buf.position(offsetInMessage);
+                            buf = buf.slice();
+                            int size;
+                            if(dst.remaining()<buf.remaining())
+                            {
+                                buf.limit(dst.remaining());
+                                size = dst.remaining();
+                            }
+                            else
+                            {
+                                size = buf.remaining();
+                            }
+                            dst.put(buf);
+                            return size;
+                        }
+
+                        @Override
+                        public ByteBuffer getContent(int offsetInMessage, int size)
+                        {
+                            ByteBuffer buf = allData.duplicate();
+                            buf.position(offsetInMessage);
+                            buf = buf.slice();
+                            if(size < buf.remaining())
+                            {
+                                buf.limit(size);
+                            }
+                            return buf;
+                        }
+
+                        @Override
+                        public StoreFuture flushToStore()
+                        {
+                            throw new UnsupportedOperationException();
+                        }
+
+                        @Override
+                        public void remove()
+                        {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+    }
+
+    protected Section getBodySection(final M serverMessage, final String mimeType)
+    {
+        byte[] data = new byte[(int) serverMessage.getSize()];
+        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+        return convertMessageBody(mimeType, data);
+    }
 
     private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
     {

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+public class MessageConverter_v1_0_to_Internal implements MessageConverter<Message_1_0, InternalMessage>
+{
+
+    static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance();
+    static
+    {
+        TYPE_REGISTRY.registerTransportLayer();
+        TYPE_REGISTRY.registerMessagingLayer();
+        TYPE_REGISTRY.registerTransactionLayer();
+        TYPE_REGISTRY.registerSecurityLayer();
+    }
+
+    @Override
+    public Class<Message_1_0> getInputClass()
+    {
+        return Message_1_0.class;
+    }
+
+    @Override
+    public Class<InternalMessage> getOutputClass()
+    {
+        return InternalMessage.class;
+    }
+
+    @Override
+    public InternalMessage convert(Message_1_0 serverMessage, VirtualHost vhost)
+    {
+        final String mimeType = serverMessage.getMessageHeader().getMimeType();
+
+
+
+
+        byte[] data = new byte[(int) serverMessage.getSize()];
+        serverMessage.getStoredMessage().getContent(0,ByteBuffer.wrap(data));
+
+        SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(TYPE_REGISTRY);
+
+        try
+        {
+            List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data));
+            ListIterator<Section> iterator = sections.listIterator();
+            Section previousSection = null;
+            while(iterator.hasNext())
+            {
+                Section section = iterator.next();
+                if(!(section instanceof AmqpValue  || section instanceof Data || section instanceof AmqpSequence))
+                {
+                    iterator.remove();
+                }
+                else
+                {
+                    if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue))
+                    {
+                        throw new RuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
+                    }
+                    else
+                    {
+                        previousSection = section;
+                    }
+                }
+            }
+
+            Object bodyObject;
+
+            if(sections.isEmpty())
+            {
+                // should actually be illegal
+                bodyObject = new byte[0];
+            }
+            else
+            {
+                Section firstBodySection = sections.get(0);
+                if(firstBodySection instanceof AmqpValue)
+                {
+                    bodyObject = fixObject(((AmqpValue)firstBodySection).getValue());
+                }
+                else if(firstBodySection instanceof Data)
+                {
+                    int totalSize = 0;
+                    for(Section section : sections)
+                    {
+                        totalSize += ((Data)section).getValue().getLength();
+                    }
+                    byte[] bodyData = new byte[totalSize];
+                    ByteBuffer buf = ByteBuffer.wrap(bodyData);
+                    for(Section section : sections)
+                    {
+                        buf.put(((Data)section).getValue().asByteBuffer());
+                    }
+                    bodyObject = bodyData;
+                }
+                else
+                {
+                    ArrayList totalSequence = new ArrayList();
+                    for(Section section : sections)
+                    {
+                        totalSequence.addAll(((AmqpSequence)section).getValue());
+                    }
+                    bodyObject = fixObject(totalSequence);
+                }
+            }
+            return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject);
+
+        }
+        catch (AmqpErrorException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+
+
+
+    }
+
+    private Object fixObject(final Object value)
+    {
+        if(value instanceof Binary)
+        {
+            final Binary binaryValue = (Binary) value;
+            byte[] data = new byte[binaryValue.getLength()];
+            binaryValue.asByteBuffer().get(data);
+            return data;
+        }
+        else if(value instanceof List)
+        {
+            List listValue = (List) value;
+            List fixedValue = new ArrayList(listValue.size());
+            for(Object o : listValue)
+            {
+                fixedValue.add(fixObject(o));
+            }
+            return fixedValue;
+        }
+        else if(value instanceof Map)
+        {
+            Map<?,?> mapValue = (Map) value;
+            Map fixedValue = new LinkedHashMap(mapValue.size());
+            for(Map.Entry<?,?> entry : mapValue.entrySet())
+            {
+                fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue()));
+            }
+            return fixedValue;
+        }
+        else
+        {
+            return value;
+        }
+
+    }
+
+    private static Object convertMessageBody(String mimeType, byte[] data)
+    {
+        if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+        {
+            String text = new String(data);
+            return text;
+        }
+        else if("jms/map-message".equals(mimeType))
+        {
+            TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+            LinkedHashMap map = new LinkedHashMap();
+            final int entries = reader.readIntImpl();
+            for (int i = 0; i < entries; i++)
+            {
+                try
+                {
+                    String propName = reader.readStringImpl();
+                    Object value = reader.readObject();
+
+                    map.put(propName, value);
+                }
+                catch (EOFException e)
+                {
+                    throw new IllegalArgumentException(e);
+                }
+                catch (TypedBytesFormatException e)
+                {
+                    throw new IllegalArgumentException(e);
+                }
+
+            }
+
+            return map;
+
+        }
+        else if("amqp/map".equals(mimeType))
+        {
+            BBDecoder decoder = new BBDecoder();
+            decoder.init(ByteBuffer.wrap(data));
+            final Map<String,Object> map = decoder.readMap();
+
+            return map;
+
+        }
+        else if("amqp/list".equals(mimeType))
+        {
+            BBDecoder decoder = new BBDecoder();
+            decoder.init(ByteBuffer.wrap(data));
+            return decoder.readList();
+        }
+        else if("jms/stream-message".equals(mimeType))
+        {
+            TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+            List list = new ArrayList();
+            while (reader.remaining() != 0)
+            {
+                try
+                {
+                    list.add(reader.readObject());
+                }
+                catch (TypedBytesFormatException e)
+                {
+                    throw new RuntimeException(e);  // TODO - Implement
+                }
+                catch (EOFException e)
+                {
+                    throw new RuntimeException(e);  // TODO - Implement
+                }
+            }
+            return list;
+        }
+        else
+        {
+            return data;
+
+        }
+    }
+
+    @Override
+    public String getType()
+    {
+        return "v0-8 to Internal";
+    }
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sun Feb  9 17:40:35 2014
@@ -391,10 +391,6 @@ public class SendingLink_1_0 implements 
                 options.add(Consumer.Option.NO_LOCAL);
             }
 
-
-            _consumer.setNoLocal(noLocal);
-
-
             try
             {
                 _consumer = _queue.addConsumer(_target,

Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (from r1564820, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter&r1=1564820&r2=1566328&rev=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter Sun Feb  9 17:40:35 2014
@@ -16,4 +16,5 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0
+org.apache.qpid.server.protocol.v1_0.MessageConverter_Internal_to_v1_0
+org.apache.qpid.server.protocol.v1_0.MessageConverter_v1_0_to_Internal
\ No newline at end of file

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Sun Feb  9 17:40:35 2014
@@ -61,7 +61,7 @@ public class MessageConverter_0_10_to_0_
         {
             if(deliveryProps.hasDeliveryMode())
             {
-                props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
+                props.setDeliveryMode((deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
                                               ? BasicContentHeaderProperties.PERSISTENT
                                               : BasicContentHeaderProperties.NON_PERSISTENT));
             }

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/build.xml?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/build.xml (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/build.xml Sun Feb  9 17:40:35 2014
@@ -0,0 +1,32 @@
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements.  See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ - 
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ - 
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -->
+<project name="Qpid Broker-Plugins AMQP Management" default="build">
+    <property name="module.depends" value="common broker-core" />
+    <property name="module.test.depends" value="qpid-test-utils broker-core/tests" />
+
+    <property name="module.genpom" value="true"/>
+    <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker-core=provided"/>
+
+    <property name="broker.plugin" value="true"/>
+    <property name="broker-plugins-management-amqp.libs" value="" />
+
+    <import file="../../module.xml" />
+
+    <target name="bundle" depends="bundle-tasks"/>
+</project>

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/pom.xml?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/pom.xml (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/pom.xml Sun Feb  9 17:40:35 2014
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>qpid-parent</artifactId>
+        <groupId>org.apache.qpid</groupId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>qpid-broker-plugins-management-amqp</artifactId>
+    <version>0.28-SNAPSHOT</version>
+    <name>AMQP Management Protocol Plug-in</name>
+    <description>AMQP Management broker plug-in</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        </dependencies>
+
+    <build>
+    </build>
+
+</project>

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import java.util.Arrays;
+
+class ManagedEntityType
+{
+    private final String _name;
+    private final ManagedEntityType[] _parents;
+    private final String[] _attributes;
+    private final String[] _operations;
+
+    ManagedEntityType(final String name,
+                      final ManagedEntityType[] parents,
+                      final String[] attributes,
+                      final String[] operations)
+    {
+        _name = name;
+        _parents = parents;
+        _attributes = attributes;
+        _operations = operations;
+    }
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    public ManagedEntityType[] getParents()
+    {
+        return _parents;
+    }
+
+    public String[] getAttributes()
+    {
+        return _attributes;
+    }
+
+    public String[] getOperations()
+    {
+        return _operations;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "ManagedEntityType{" +
+               "name='" + _name + '\'' +
+               ", parents=" + Arrays.toString(_parents) +
+               ", attributes=" + Arrays.toString(_attributes) +
+               ", operations=" + Arrays.toString(_operations) +
+               '}';
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org