You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/03/27 20:24:12 UTC

[qpid-broker-j] branch master updated: QPID-8286: [Broker-J] Add operation into priority queue to change message priority

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/master by this push:
     new 159a753  QPID-8286: [Broker-J] Add operation into priority queue to change message priority
159a753 is described below

commit 159a75361cc584b28d92dec2d0abce6a311480f5
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Mar 27 00:54:32 2019 +0000

    QPID-8286: [Broker-J] Add operation into priority queue to change message priority
    
    This closes #22
---
 .../ServerMessageMutator.java}                     |  20 +-
 .../message/ServerMessageMutatorFactory.java       |  59 ++++++
 .../message/internal/InternalMessageMutator.java   |  88 ++++++++
 .../internal/InternalMessageMutatorFactory.java}   |  32 +--
 .../apache/qpid/server/queue/PriorityQueue.java    |  39 ++++
 .../qpid/server/queue/PriorityQueueImpl.java       | 167 +++++++++++++--
 .../internal/InternalMessageMutatorTest.java       | 123 +++++++++++
 .../qpid/server/queue/PriorityQueueTest.java       | 178 ++++++++++++++--
 .../v0_10/MessageTransferMessageMutator.java       | 136 +++++++++++++
 .../MessageTransferMessageMutatorFactory.java      |  43 ++++
 .../v0_10/MessageTransferMessageMutatorTest.java   | 118 +++++++++++
 .../server/protocol/v0_8/AMQMessageMutator.java    |  71 +++++++
 .../protocol/v0_8/AMQMessageMutatorFactory.java    |  32 +--
 .../protocol/v0_8/AMQMessageMutatorTest.java       | 121 +++++++++++
 .../server/protocol/v1_0/Message_1_0_Mutator.java  | 224 +++++++++++++++++++++
 .../protocol/v1_0/Message_1_0_MutatorFactory.java  |  31 +--
 .../protocol/v1_0/Message_1_0_MutatorTest.java     | 140 +++++++++++++
 .../endtoend/message/MessageManagementTest.java    | 119 +++++++++++
 18 files changed, 1649 insertions(+), 92 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutator.java
similarity index 56%
copy from broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
copy to broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutator.java
index b2ac9d7..1111708 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutator.java
@@ -18,22 +18,14 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.message;
 
-import org.apache.qpid.server.model.ManagedAttribute;
-import org.apache.qpid.server.model.ManagedContextDefault;
-import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.Queue;
-
-@ManagedObject( category = false, type="priority",
-        amqpName = "org.apache.qpid.PriorityQueue" )
-public interface PriorityQueue<X extends PriorityQueue<X>> extends Queue<X>
+public interface ServerMessageMutator<T extends ServerMessage>
 {
-    String PRIORITIES = "priorities";
+    void setPriority(byte priority);
+
+    byte getPriority();
 
-    @ManagedContextDefault( name = "queue.priorities")
-    int DEFAULT_PRIORITY_LEVELS = 10;
+    T create();
 
-    @ManagedAttribute( defaultValue = "${queue.priorities}")
-    int getPriorities();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutatorFactory.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutatorFactory.java
new file mode 100644
index 0000000..46055db
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutatorFactory.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.qpid.server.plugin.Pluggable;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.store.MessageStore;
+
+public interface ServerMessageMutatorFactory<T extends ServerMessage> extends Pluggable
+{
+    ServerMessageMutator<T> create(T serverMessage, MessageStore messageStore);
+
+    static <T extends ServerMessage> ServerMessageMutator<T> createMutator(T serverMessage, MessageStore messageStore)
+    {
+        final ServerMessageMutatorFactory<T> factory =
+                ServerMessageMutatorFactoryRegistry.get(serverMessage.getClass().getName());
+        if (factory == null)
+        {
+            throw new IllegalStateException(String.format("Cannot find server message mutator for message class '%s'",
+                                                          serverMessage.getClass().getName()));
+        }
+        return factory.create(serverMessage, messageStore);
+    }
+
+    class ServerMessageMutatorFactoryRegistry
+    {
+        private static Map<String, ServerMessageMutatorFactory> MUTATOR_FACTORIES =
+                StreamSupport.stream(new QpidServiceLoader().instancesOf(ServerMessageMutatorFactory.class)
+                                                            .spliterator(), false).collect(
+                        Collectors.toMap(Pluggable::getType, i -> i));
+
+        private static ServerMessageMutatorFactory get(String type)
+        {
+            return MUTATOR_FACTORIES.get(type);
+        }
+    }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutator.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutator.java
new file mode 100644
index 0000000..e806678
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutator.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message.internal;
+
+import java.util.HashMap;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+
+public class InternalMessageMutator implements ServerMessageMutator<InternalMessage>
+{
+    private final InternalMessage _message;
+    private final MessageStore _messageStore;
+    private byte _priority;
+
+    InternalMessageMutator(final InternalMessage message, final MessageStore messageStore)
+    {
+        _message = message;
+        _messageStore = messageStore;
+        final InternalMessageHeader messageHeader = _message.getMessageHeader();
+        _priority = messageHeader.getPriority();
+    }
+
+    @Override
+    public void setPriority(final byte priority)
+    {
+        _priority = priority;
+    }
+
+    @Override
+    public byte getPriority()
+    {
+        return _priority;
+    }
+
+    @Override
+    public InternalMessage create()
+    {
+        final InternalMessageHeader messageHeader = _message.getMessageHeader();
+        final InternalMessageHeader newHeader = new InternalMessageHeader(new HashMap<>(messageHeader.getHeaderMap()),
+                                                                          messageHeader.getCorrelationId(),
+                                                                          messageHeader.getExpiration(),
+                                                                          messageHeader.getUserId(),
+                                                                          messageHeader.getAppId(),
+                                                                          messageHeader.getMessageId(),
+                                                                          messageHeader.getMimeType(),
+                                                                          messageHeader.getEncoding(),
+                                                                          _priority,
+                                                                          messageHeader.getTimestamp(),
+                                                                          messageHeader.getNotValidBefore(),
+                                                                          messageHeader.getType(),
+                                                                          messageHeader.getReplyTo(),
+                                                                          _message.getArrivalTime());
+
+        final long contentSize = _message.getSize();
+        final InternalMessageMetaData metaData =
+                InternalMessageMetaData.create(_message.isPersistent(), newHeader, (int) contentSize);
+        final MessageHandle<InternalMessageMetaData> handle = _messageStore.addMessage(metaData);
+        final QpidByteBuffer content = _message.getContent();
+        if (content != null)
+        {
+            handle.addContent(content);
+        }
+        final StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
+        return new InternalMessage(storedMessage, newHeader, _message.getMessageBody(), _message.getTo());
+    }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutatorFactory.java
similarity index 51%
copy from broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
copy to broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutatorFactory.java
index b2ac9d7..9f5a91a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutatorFactory.java
@@ -18,22 +18,26 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.message.internal;
 
-import org.apache.qpid.server.model.ManagedAttribute;
-import org.apache.qpid.server.model.ManagedContextDefault;
-import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MessageStore;
 
-@ManagedObject( category = false, type="priority",
-        amqpName = "org.apache.qpid.PriorityQueue" )
-public interface PriorityQueue<X extends PriorityQueue<X>> extends Queue<X>
+@PluggableService
+public class InternalMessageMutatorFactory implements ServerMessageMutatorFactory<InternalMessage>
 {
-    String PRIORITIES = "priorities";
+    @Override
+    public ServerMessageMutator<InternalMessage> create(final InternalMessage serverMessage,
+                                                        final MessageStore messageStore)
+    {
+        return new InternalMessageMutator(serverMessage, messageStore);
+    }
 
-    @ManagedContextDefault( name = "queue.priorities")
-    int DEFAULT_PRIORITY_LEVELS = 10;
-
-    @ManagedAttribute( defaultValue = "${queue.priorities}")
-    int getPriorities();
+    @Override
+    public String getType()
+    {
+        return InternalMessage.class.getName();
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
index b2ac9d7..d290b24 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
@@ -20,9 +20,13 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.List;
+
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedContextDefault;
 import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedOperation;
+import org.apache.qpid.server.model.Param;
 import org.apache.qpid.server.model.Queue;
 
 @ManagedObject( category = false, type="priority",
@@ -36,4 +40,39 @@ public interface PriorityQueue<X extends PriorityQueue<X>> extends Queue<X>
 
     @ManagedAttribute( defaultValue = "${queue.priorities}")
     int getPriorities();
+
+    /**
+     * Re-enqueue the message with given id as a new message with priority changed to specified one.
+     * <br>
+     * The operation results in a deletion of original message and creation of new message
+     * which is a copy of original one except for different message id and priority.
+     *
+     * @param messageId message id
+     * @param newPriority new priority
+     * @return  new message id, or -1 if message is not found or priority is not changed
+     */
+    @ManagedOperation(description = "Change the priority of the message with given message ID",
+                      nonModifying = true,
+                      changesConfiguredObjectState = false)
+    long reenqueueMessageForPriorityChange(@Param(name = "messageId", description = "A message ID") long messageId,
+                                           @Param(name = "newPriority", description = "the new priority") int newPriority);
+
+    /**
+     * Re-enqueue the messages matching given selector expression  as a new messages having priority changed to specified one.
+     * <br>
+     * Using {@code null} or an empty filter will change <em>all</em> messages from this queue.
+     * <br>
+     * The operation results in a deletion of original messages and creation of new messages
+     * having the same properties and content as original ones except for different message id and priority.
+     *
+     * @param selector selector expression
+     * @param newPriority new priority
+     * @return the list containing ids of re-enqueed message s
+     */
+    @ManagedOperation(description = "Change the priority of the messages matching the given selector expression",
+                      nonModifying = true,
+                      changesConfiguredObjectState = false)
+    List<Long> reenqueueMessagesForPriorityChange(@Param(name = "selector", description = "A message selector (can be empty)") String selector,
+                                                  @Param(name = "newPriority", description = "the new priority") int newPriority);
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
index e492a7b..fee4228 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
@@ -1,32 +1,48 @@
 /*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.queue;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
 
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.SelectorParsingException;
+import org.apache.qpid.server.filter.selector.ParseException;
+import org.apache.qpid.server.filter.selector.TokenMgrError;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.ManagedAttributeField;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implements PriorityQueue<PriorityQueueImpl>
@@ -76,4 +92,117 @@ public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implem
                                      true);
     }
 
+    @Override
+    public long reenqueueMessageForPriorityChange(final long messageId, final int newPriority)
+    {
+        final QueueEntry entry = getMessageOnTheQueue(messageId);
+        if (entry != null)
+        {
+            final ServerMessage message = entry.getMessage();
+            if (message != null && message.getMessageHeader().getPriority() != newPriority && entry.acquire())
+            {
+                final MessageStore store = getVirtualHost().getMessageStore();
+                final LocalTransaction txn = new LocalTransaction(store);
+                final long newMessageId = reenqueueEntryWithPriority(entry, txn, (byte) newPriority);
+                txn.commit();
+                return newMessageId;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public List<Long> reenqueueMessagesForPriorityChange(final String selector, final int newPriority)
+    {
+        final JMSSelectorFilter filter;
+        try
+        {
+            filter = selector == null ? null : new JMSSelectorFilter(selector);
+        }
+        catch (ParseException | SelectorParsingException | TokenMgrError e)
+        {
+            throw new IllegalArgumentException("Cannot parse selector \"" + selector + "\"", e);
+        }
+
+        final List<Long> messageIds =
+                reenqueueEntriesForPriorityChange(entry -> filter == null || filter.matches(entry.asFilterable()),
+                                                  newPriority);
+        return Collections.unmodifiableList(messageIds);
+    }
+
+    private List<Long> reenqueueEntriesForPriorityChange(final Predicate<QueueEntry> condition,
+                                                         final int newPriority)
+    {
+        final Predicate<QueueEntry> isNotNullMessageAndPriorityDiffers = entry -> {
+            final ServerMessage message = entry.getMessage();
+            return message != null && message.getMessageHeader().getPriority() != newPriority;
+        };
+        return handleMessagesWithinStoreTransaction(isNotNullMessageAndPriorityDiffers.and(condition),
+                                                    (txn, entry) -> reenqueueEntryWithPriority(entry, txn, (byte) newPriority));
+    }
+
+    private long reenqueueEntryWithPriority(final QueueEntry entry,
+                                            final ServerTransaction txn,
+                                            final byte newPriority)
+    {
+        txn.dequeue(entry.getEnqueueRecord(),
+                    new ServerTransaction.Action()
+                    {
+                        @Override
+                        public void postCommit()
+                        {
+                            entry.delete();
+                        }
+
+                        @Override
+                        public void onRollback()
+                        {
+                            entry.release();
+                        }
+                    });
+
+        final ServerMessage newMessage = createMessageWithPriority(entry.getMessage(), newPriority);
+        txn.enqueue(this,
+                    newMessage,
+                    new ServerTransaction.EnqueueAction()
+                    {
+                        @Override
+                        public void postCommit(MessageEnqueueRecord... records)
+                        {
+                            PriorityQueueImpl.this.enqueue(newMessage, null, records[0]);
+                        }
+
+                        @Override
+                        public void onRollback()
+                        {
+                            // noop
+                        }
+                    });
+        return newMessage.getMessageNumber();
+    }
+
+    private List<Long> handleMessagesWithinStoreTransaction(final Predicate<QueueEntry> entryMatchCondition,
+                                                            final BiFunction<ServerTransaction, QueueEntry, Long> handle)
+    {
+        final MessageStore store = getVirtualHost().getMessageStore();
+        final LocalTransaction txn = new LocalTransaction(store);
+        final List<Long> result = new ArrayList<>();
+        visit(entry -> {
+            if (entryMatchCondition.test(entry) && entry.acquire())
+            {
+                result.add(handle.apply(txn, entry));
+            }
+            return false;
+        });
+        txn.commit();
+        return result;
+    }
+
+    private ServerMessage createMessageWithPriority(final ServerMessage message, final byte newPriority)
+    {
+        final ServerMessageMutator messageMutator =
+                ServerMessageMutatorFactory.createMutator(message, getVirtualHost().getMessageStore());
+        messageMutator.setPriority(newPriority);
+        return messageMutator.create();
+    }
 }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/message/internal/InternalMessageMutatorTest.java b/broker-core/src/test/java/org/apache/qpid/server/message/internal/InternalMessageMutatorTest.java
new file mode 100644
index 0000000..d1b523d
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/message/internal/InternalMessageMutatorTest.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message.internal;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class InternalMessageMutatorTest extends UnitTestBase
+{
+    private static final byte TEST_PRIORITY = (byte) 1;
+    private static final String TEST_HEADER_NAME = "foo";
+    private static final String TEST_HEADER_VALUE = "bar";
+    private static final String TEST_CONTENT_TYPE = "text/plain";
+    private static final String TEST_CONTENT = "testContent";
+    private MessageStore _messageStore;
+    private InternalMessageMutator _messageMutator;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _messageStore = new TestMemoryMessageStore();
+        final InternalMessage message = createTestMessage();
+        _messageMutator = new InternalMessageMutator(message, _messageStore);
+    }
+
+    @After
+    public void tearDown()
+    {
+        _messageStore.closeMessageStore();
+    }
+
+    @Test
+    public void setPriority()
+    {
+        _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+        assertThat(_messageMutator.getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+    }
+
+    @Test
+    public void getPriority()
+    {
+        assertThat((int) _messageMutator.getPriority(), is(equalTo((int) TEST_PRIORITY)));
+    }
+
+    @Test
+    public void create()
+    {
+        _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+
+        final InternalMessage newMessage = _messageMutator.create();
+
+        assertThat(newMessage.getMessageHeader().getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+        assertThat(newMessage.getMessageHeader().getMimeType(), is(equalTo(TEST_CONTENT_TYPE)));
+        assertThat(newMessage.getMessageHeader().getHeader(TEST_HEADER_NAME), is(equalTo(TEST_HEADER_VALUE)));
+
+        final QpidByteBuffer content = newMessage.getContent();
+
+        final byte[] bytes = new byte[content.remaining()];
+        content.copyTo(bytes);
+        assertThat(new String(bytes, UTF_8), is(equalTo(TEST_CONTENT)));
+    }
+
+    private InternalMessage createTestMessage()
+    {
+        final QpidByteBuffer content = QpidByteBuffer.wrap(TEST_CONTENT.getBytes(UTF_8));
+        final InternalMessageHeader newHeader =
+                new InternalMessageHeader(Collections.singletonMap(TEST_HEADER_NAME, TEST_HEADER_VALUE),
+                                          null,
+                                          0,
+                                          null,
+                                          null,
+                                          null,
+                                          TEST_CONTENT_TYPE,
+                                          null,
+                                          TEST_PRIORITY,
+                                          System.currentTimeMillis(),
+                                          0,
+                                          null,
+                                          null,
+                                          System.currentTimeMillis());
+
+        final long contentSize = content.remaining();
+        final InternalMessageMetaData metaData =
+                InternalMessageMetaData.create(false, newHeader, (int) contentSize);
+        final MessageHandle<InternalMessageMetaData> handle = _messageStore.addMessage(metaData);
+        handle.addContent(content);
+        final StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
+        return new InternalMessage(storedMessage, newHeader, TEST_CONTENT, "test");
+    }
+}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
index d66892d..fa3468e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
@@ -21,11 +21,12 @@
 package org.apache.qpid.server.queue;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.List;
 
 import junit.framework.AssertionFailedError;
 import org.junit.Before;
@@ -35,6 +36,13 @@ import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.StoredMessage;
+
 public class PriorityQueueTest extends AbstractQueueTestBase
 {
 
@@ -46,11 +54,11 @@ public class PriorityQueueTest extends AbstractQueueTestBase
     }
 
     @Test
-    public void testPriorityOrdering() throws Exception, InterruptedException
+    public void testPriorityOrdering() throws Exception
     {
 
         // Enqueue messages in order
-        AbstractQueue queue = (AbstractQueue) getQueue();
+        PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
         queue.enqueue(createMessage(1L, (byte) 10), null, null);
         queue.enqueue(createMessage(2L, (byte) 4), null, null);
         queue.enqueue(createMessage(3L, (byte) 0), null, null);
@@ -65,12 +73,7 @@ public class PriorityQueueTest extends AbstractQueueTestBase
         queue.enqueue(createMessage(8L, (byte) 10), null, null);
         queue.enqueue(createMessage(9L, (byte) 0), null, null);
 
-        // Register subscriber
-        queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerOption.class), 0);
-
-        while(getConsumer().processPending());
-
-        ArrayList<MessageInstance> msgs = getConsumer().getMessages();
+        final List<MessageInstance> msgs = consumeMessages(queue);;
         try
         {
             assertEquals(1L, msgs.get(0).getMessage().getMessageNumber());
@@ -88,16 +91,157 @@ public class PriorityQueueTest extends AbstractQueueTestBase
         catch (AssertionFailedError afe)
         {
             // Show message order on failure.
-            int index = 1;
-            for (MessageInstance qe : msgs)
-            {
-                System.err.println(index + ":" + qe.getMessage().getMessageNumber());
-                index++;
-            }
-
-            throw afe;
+            showMessageOrderOnFailure(msgs, afe);
+        }
+
+    }
+
+    @Test
+    public void changeMessagePriority() throws Exception
+    {
+        final PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
+        final InternalMessage internalMessage1 = createInternalMessage((byte) 3, 0);
+        final InternalMessage internalMessage2 = createInternalMessage((byte) 3, 1);
+        final InternalMessage internalMessage3 = createInternalMessage((byte) 4, 2);
+        queue.enqueue(internalMessage1, null, null);
+        queue.enqueue(internalMessage2, null, null);
+        queue.enqueue(internalMessage3, null, null);
+
+        final long result = queue.reenqueueMessageForPriorityChange(internalMessage2.getMessageNumber(), (byte)5);
+        assertEquals("Unexpected operation result", internalMessage3.getMessageNumber() + 1, result);
+
+        final List<MessageInstance> msgs = consumeMessages(queue);
+        try
+        {
+            assertEquals(internalMessage3.getMessageNumber() + 1, msgs.get(0).getMessage().getMessageNumber());
+            assertEquals(internalMessage3.getMessageNumber(), msgs.get(1).getMessage().getMessageNumber());
+            assertEquals(internalMessage1.getMessageNumber(), msgs.get(2).getMessage().getMessageNumber());
+        }
+        catch (AssertionFailedError afe)
+        {
+            showMessageOrderOnFailure(msgs, afe);
+        }
+    }
+
+    @Test
+    public void changeMessagePriorityForNonExistingMessageId() throws Exception
+    {
+        final PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
+        final InternalMessage internalMessage1 = createInternalMessage((byte) 3, 0);
+        final InternalMessage internalMessage2 = createInternalMessage((byte) 5, 1);
+        final InternalMessage internalMessage3 = createInternalMessage((byte) 4, 2);
+        queue.enqueue(internalMessage1, null, null);
+        queue.enqueue(internalMessage2, null, null);
+        queue.enqueue(internalMessage3, null, null);
+
+        final long result = queue.reenqueueMessageForPriorityChange(internalMessage3.getMessageNumber() + 1, (byte)6);
+        assertEquals("Unexpected operation result", -1, result);
+
+        final List<MessageInstance> msgs = consumeMessages(queue);
+        try
+        {
+            assertEquals(internalMessage2.getMessageNumber(), msgs.get(0).getMessage().getMessageNumber());
+            assertEquals(internalMessage3.getMessageNumber(), msgs.get(1).getMessage().getMessageNumber());
+            assertEquals(internalMessage1.getMessageNumber(), msgs.get(2).getMessage().getMessageNumber());
+        }
+        catch (AssertionFailedError afe)
+        {
+            showMessageOrderOnFailure(msgs, afe);
+        }
+    }
+
+    @Test
+    public void changeMessagesPriority() throws Exception
+    {
+        final PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
+        final InternalMessage internalMessage1 = createInternalMessage((byte) 3, 0);
+        final InternalMessage internalMessage2 = createInternalMessage((byte) 3, 1);
+        final InternalMessage internalMessage3 = createInternalMessage((byte) 4, 2);
+        queue.enqueue(internalMessage1, null, null);
+        queue.enqueue(internalMessage2, null, null);
+        queue.enqueue(internalMessage3, null, null);
+
+        final List<Long> result = queue.reenqueueMessagesForPriorityChange("id in ('2','0')", (byte)5);
+        assertEquals("Unexpected operation result", 2, result.size());
+
+        final List<MessageInstance> msgs = consumeMessages(queue);
+        try
+        {
+            assertEquals(internalMessage3.getMessageNumber() + 1, msgs.get(0).getMessage().getMessageNumber());
+            assertEquals("2", msgs.get(0).getMessage().getMessageHeader().getHeader("id"));
+            assertEquals(internalMessage3.getMessageNumber() + 2, msgs.get(1).getMessage().getMessageNumber());
+            assertEquals("0", msgs.get(1).getMessage().getMessageHeader().getHeader("id"));
+            assertEquals(internalMessage2.getMessageNumber(), msgs.get(2).getMessage().getMessageNumber());
+            assertEquals("1", msgs.get(2).getMessage().getMessageHeader().getHeader("id"));
+        }
+        catch (AssertionFailedError afe)
+        {
+            showMessageOrderOnFailure(msgs, afe);
+        }
+    }
+
+    @Test
+    public void changeMessagesPriorityForNonExistingMessageId() throws Exception
+    {
+        final PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
+        final InternalMessage internalMessage1 = createInternalMessage((byte) 3, 0);
+        final InternalMessage internalMessage2 = createInternalMessage((byte) 4, 1);
+        final InternalMessage internalMessage3 = createInternalMessage((byte) 3, 2);
+        queue.enqueue(internalMessage1, null, null);
+        queue.enqueue(internalMessage2, null, null);
+        queue.enqueue(internalMessage3, null, null);
+
+        final List<Long> result = queue.reenqueueMessagesForPriorityChange("id in ('3','2')", (byte)5);
+        assertEquals("Unexpected operation result", 1, result.size());
+
+        final List<MessageInstance> msgs = consumeMessages(queue);
+        try
+        {
+            assertEquals(internalMessage3.getMessageNumber() + 1, msgs.get(0).getMessage().getMessageNumber());
+            assertEquals("2", msgs.get(0).getMessage().getMessageHeader().getHeader("id"));
+            assertEquals(internalMessage2.getMessageNumber(), msgs.get(1).getMessage().getMessageNumber());
+            assertEquals("1", msgs.get(1).getMessage().getMessageHeader().getHeader("id"));
+            assertEquals(internalMessage1.getMessageNumber(), msgs.get(2).getMessage().getMessageNumber());
+            assertEquals("0", msgs.get(2).getMessage().getMessageHeader().getHeader("id"));
         }
+        catch (AssertionFailedError afe)
+        {
+            showMessageOrderOnFailure(msgs, afe);
+        }
+    }
 
+    private List<MessageInstance> consumeMessages(final Queue queue)
+            throws Exception
+    {
+        queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerOption.class), 0);
+
+        while(getConsumer().processPending());
+        return getConsumer().getMessages();
+    }
+
+    private void showMessageOrderOnFailure(final List<MessageInstance> msgs, final AssertionFailedError afe)
+    {
+        int index = 1;
+        for (MessageInstance qe : msgs)
+        {
+            System.err.println(index + ":" + qe.getMessage().getMessageNumber());
+            index++;
+        }
+
+        throw afe;
+    }
+
+    private InternalMessage createInternalMessage(byte priority, int index)
+    {
+        final AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
+        when(messageHeader.getPriority()).thenReturn(priority);
+        when(messageHeader.getHeader("id")).thenReturn(String.valueOf(index));
+        when(messageHeader.getHeaderNames()).thenReturn(Collections.singleton("id"));
+        final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(messageHeader);
+        final InternalMessageMetaData metaData =  new InternalMessageMetaData(true, internalMessageHeader, 0);
+        MessageHandle<InternalMessageMetaData> handle = getQueue().getVirtualHost().getMessageStore().addMessage(metaData);
+        final StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
+        return new InternalMessage(storedMessage, internalMessageHeader, null, getQueue().getName());
     }
 
     protected ServerMessage createMessage(Long id, byte i)
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutator.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutator.java
new file mode 100644
index 0000000..ffab8dd
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutator.java
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+
+public class MessageTransferMessageMutator implements ServerMessageMutator<MessageTransferMessage>
+{
+    private final MessageTransferMessage _message;
+    private final MessageStore _messageStore;
+    private MessageProperties _messageProperties;
+    private DeliveryProperties _deliveryProperties;
+
+    MessageTransferMessageMutator(final MessageTransferMessage message, final MessageStore messageStore)
+    {
+        _message = message;
+        _messageStore = messageStore;
+        final MessageProperties messageProperties = message.getHeader().getMessageProperties();
+        _messageProperties = messageProperties == null ? null : new MessageProperties(messageProperties);
+        final DeliveryProperties deliveryProperties = _message.getHeader().getDeliveryProperties();
+        DeliveryProperties properties = null;
+        if (deliveryProperties != null)
+        {
+            properties = new DeliveryProperties();
+            if (deliveryProperties.hasDeliveryMode())
+            {
+                properties.setDeliveryMode(deliveryProperties.getDeliveryMode());
+            }
+            if (deliveryProperties.hasDiscardUnroutable())
+            {
+                properties.setDiscardUnroutable(deliveryProperties.getDiscardUnroutable());
+            }
+            if (deliveryProperties.hasExchange())
+            {
+                properties.setExchange(deliveryProperties.getExchange());
+            }
+            if (deliveryProperties.hasExpiration())
+            {
+                properties.setExpiration(deliveryProperties.getExpiration());
+            }
+            if (deliveryProperties.hasTtl())
+            {
+                properties.setTtl(deliveryProperties.getTtl());
+            }
+            if (deliveryProperties.hasImmediate())
+            {
+                properties.setImmediate(deliveryProperties.getImmediate());
+            }
+            if (deliveryProperties.hasPriority())
+            {
+                properties.setPriority(deliveryProperties.getPriority());
+            }
+            if (deliveryProperties.hasRedelivered())
+            {
+                properties.setRedelivered(deliveryProperties.getRedelivered());
+            }
+            if (deliveryProperties.hasResumeId())
+            {
+                properties.setResumeId(deliveryProperties.getResumeId());
+            }
+            if (deliveryProperties.hasResumeTtl())
+            {
+                properties.setResumeTtl(deliveryProperties.getResumeTtl());
+            }
+            if (deliveryProperties.hasRoutingKey())
+            {
+                properties.setRoutingKey(deliveryProperties.getRoutingKey());
+            }
+            if (deliveryProperties.hasTimestamp())
+            {
+                properties.setTimestamp(deliveryProperties.getTimestamp());
+            }
+        }
+        _deliveryProperties = properties;
+    }
+
+    @Override
+    public void setPriority(final byte priority)
+    {
+        if (_deliveryProperties == null)
+        {
+            _deliveryProperties = new DeliveryProperties();
+        }
+        _deliveryProperties.setPriority(MessageDeliveryPriority.get(priority));
+    }
+
+
+    @Override
+    public byte getPriority()
+    {
+        MessageDeliveryPriority priority = _deliveryProperties == null || !_deliveryProperties.hasPriority()
+                ? MessageDeliveryPriority.MEDIUM
+                : _deliveryProperties.getPriority();
+        return (byte) priority.getValue();
+    }
+
+    @Override
+    public MessageTransferMessage create()
+    {
+        final Header header = new Header(_deliveryProperties, _messageProperties);
+        final MessageMetaData_0_10 messageMetaData =
+                new MessageMetaData_0_10(header, (int) _message.getSize(), _message.getArrivalTime());
+        final QpidByteBuffer content = _message.getContent();
+        final MessageHandle<MessageMetaData_0_10> addedMessage = _messageStore.addMessage(messageMetaData);
+        if (content != null)
+        {
+            addedMessage.addContent(content);
+        }
+        return new MessageTransferMessage(addedMessage.allContentAdded(), _message.getConnectionReference());
+    }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorFactory.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorFactory.java
new file mode 100644
index 0000000..3fac094
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MessageStore;
+
+@PluggableService
+public class MessageTransferMessageMutatorFactory implements ServerMessageMutatorFactory<MessageTransferMessage>
+{
+    @Override
+    public ServerMessageMutator<MessageTransferMessage> create(final MessageTransferMessage serverMessage,
+                                                               final MessageStore messageStore)
+    {
+        return new MessageTransferMessageMutator(serverMessage, messageStore);
+    }
+
+    @Override
+    public String getType()
+    {
+        return MessageTransferMessage.class.getName();
+    }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorTest.java
new file mode 100644
index 0000000..5f8327e
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorTest.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class MessageTransferMessageMutatorTest extends UnitTestBase
+{
+    private static final short TEST_PRIORITY = (short) 1;
+    private static final String TEST_HEADER_NAME = "foo";
+    private static final String TEST_HEADER_VALUE = "bar";
+    private static final String TEST_CONTENT_TYPE = "text/plain";
+    private static final String TEST_CONTENT = "testContent";
+    private MessageStore _messageStore;
+    private MessageTransferMessageMutator _messageMutator;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _messageStore = new TestMemoryMessageStore();
+        final MessageTransferMessage message = createTestMessage();
+        _messageMutator = new MessageTransferMessageMutator(message, _messageStore);
+    }
+
+
+    @After
+    public void tearDown()
+    {
+        _messageStore.closeMessageStore();
+    }
+
+    @Test
+    public void setPriority()
+    {
+        _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+        assertThat(_messageMutator.getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+    }
+
+    @Test
+    public void getPriority()
+    {
+        assertThat((int) _messageMutator.getPriority(), is(equalTo((int) TEST_PRIORITY)));
+    }
+
+    @Test
+    public void create()
+    {
+        _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+
+        MessageTransferMessage newMessage = _messageMutator.create();
+
+        assertThat(newMessage.getMessageHeader().getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+        assertThat(newMessage.getMessageHeader().getMimeType(), is(equalTo(TEST_CONTENT_TYPE)));
+        assertThat(newMessage.getMessageHeader().getHeader(TEST_HEADER_NAME), is(equalTo(TEST_HEADER_VALUE)));
+
+        QpidByteBuffer content = newMessage.getContent();
+
+        final byte[] bytes = new byte[content.remaining()];
+        content.copyTo(bytes);
+        assertThat(new String(bytes, UTF_8), is(equalTo(TEST_CONTENT)));
+    }
+
+    private MessageTransferMessage createTestMessage()
+    {
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        deliveryProperties.setPriority(MessageDeliveryPriority.get(TEST_PRIORITY));
+        final MessageProperties messageProperties = new MessageProperties();
+
+        messageProperties.setContentType(TEST_CONTENT_TYPE);
+        messageProperties.setApplicationHeaders(Collections.singletonMap(TEST_HEADER_NAME, TEST_HEADER_VALUE));
+
+        final Header header = new Header(deliveryProperties, messageProperties);
+        final QpidByteBuffer content = QpidByteBuffer.wrap(TEST_CONTENT.getBytes(UTF_8));
+        final MessageMetaData_0_10 messageMetaData =
+                new MessageMetaData_0_10(header, content.remaining(), System.currentTimeMillis());
+        final MessageHandle<MessageMetaData_0_10> addedMessage = _messageStore.addMessage(messageMetaData);
+        addedMessage.addContent(content);
+        return new MessageTransferMessage(addedMessage.allContentAdded(), null);
+    }
+
+}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutator.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutator.java
new file mode 100644
index 0000000..9b3a6ac
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutator.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+
+public class AMQMessageMutator implements ServerMessageMutator<AMQMessage>
+{
+    private final AMQMessage _message;
+    private final MessageStore _messageStore;
+    private final BasicContentHeaderProperties _basicContentHeaderProperties;
+
+    AMQMessageMutator(final AMQMessage message, final MessageStore messageStore)
+    {
+        _message = message;
+        _messageStore = messageStore;
+        _basicContentHeaderProperties =
+                new BasicContentHeaderProperties(_message.getContentHeaderBody().getProperties());
+    }
+
+    @Override
+    public void setPriority(final byte priority)
+    {
+        _basicContentHeaderProperties.setPriority(priority);
+    }
+
+    @Override
+    public byte getPriority()
+    {
+        return _basicContentHeaderProperties.getPriority();
+    }
+
+    @Override
+    public AMQMessage create()
+    {
+        final long contentSize = _message.getSize();
+        final QpidByteBuffer content = _message.getContent();
+        final ContentHeaderBody contentHeader = new ContentHeaderBody(_basicContentHeaderProperties, contentSize);
+        final MessageMetaData messageMetaData =
+                new MessageMetaData(_message.getMessagePublishInfo(), contentHeader, _message.getArrivalTime());
+        final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+        if (content != null)
+        {
+            handle.addContent(content);
+        }
+        return new AMQMessage(handle.allContentAdded(), _message.getConnectionReference());
+    }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorFactory.java
similarity index 52%
copy from broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
copy to broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorFactory.java
index b2ac9d7..b32466f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorFactory.java
@@ -18,22 +18,26 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.server.model.ManagedAttribute;
-import org.apache.qpid.server.model.ManagedContextDefault;
-import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MessageStore;
 
-@ManagedObject( category = false, type="priority",
-        amqpName = "org.apache.qpid.PriorityQueue" )
-public interface PriorityQueue<X extends PriorityQueue<X>> extends Queue<X>
+@PluggableService
+public class AMQMessageMutatorFactory implements ServerMessageMutatorFactory<AMQMessage>
 {
-    String PRIORITIES = "priorities";
+    @Override
+    public ServerMessageMutator<AMQMessage> create(final AMQMessage serverMessage,
+                                                        final MessageStore messageStore)
+    {
+        return new AMQMessageMutator(serverMessage, messageStore);
+    }
 
-    @ManagedContextDefault( name = "queue.priorities")
-    int DEFAULT_PRIORITY_LEVELS = 10;
-
-    @ManagedAttribute( defaultValue = "${queue.priorities}")
-    int getPriorities();
+    @Override
+    public String getType()
+    {
+        return AMQMessage.class.getName();
+    }
 }
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorTest.java
new file mode 100644
index 0000000..e5acc91
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorTest.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class AMQMessageMutatorTest extends UnitTestBase
+{
+    private static final byte TEST_PRIORITY = (byte) 1;
+    private static final String TEST_HEADER_NAME = "foo";
+    private static final String TEST_HEADER_VALUE = "bar";
+    private static final String TEST_CONTENT_TYPE = "text/plain";
+    private static final String TEST_CONTENT = "testContent";
+    private MessageStore _messageStore;
+    private AMQMessageMutator _messageMutator;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _messageStore = new TestMemoryMessageStore();
+        final AMQMessage message = createTestMessage();
+        _messageMutator = new AMQMessageMutator(message, _messageStore);
+    }
+
+
+    @After
+    public void tearDown()
+    {
+        _messageStore.closeMessageStore();
+    }
+
+    @Test
+    public void setPriority()
+    {
+        _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+        assertThat(_messageMutator.getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+    }
+
+    @Test
+    public void getPriority()
+    {
+        assertThat((int) _messageMutator.getPriority(), is(equalTo((int) TEST_PRIORITY)));
+    }
+
+    @Test
+    public void create()
+    {
+        _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+
+        AMQMessage newMessage = _messageMutator.create();
+
+        assertThat(newMessage.getMessageHeader().getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+        assertThat(newMessage.getMessageHeader().getMimeType(), is(equalTo(TEST_CONTENT_TYPE)));
+        assertThat(newMessage.getMessageHeader().getHeader(TEST_HEADER_NAME), is(equalTo(TEST_HEADER_VALUE)));
+
+        QpidByteBuffer content = newMessage.getContent();
+
+        final byte[] bytes = new byte[content.remaining()];
+        content.copyTo(bytes);
+        assertThat(new String(bytes, UTF_8), is(equalTo(TEST_CONTENT)));
+    }
+
+    private AMQMessage createTestMessage()
+    {
+        final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
+        basicContentHeaderProperties.setPriority(TEST_PRIORITY);
+        basicContentHeaderProperties.setHeaders(FieldTableFactory.createFieldTable(Collections.singletonMap(
+                TEST_HEADER_NAME,
+                TEST_HEADER_VALUE)));
+        basicContentHeaderProperties.setContentType(TEST_CONTENT_TYPE);
+
+        QpidByteBuffer content = QpidByteBuffer.wrap(TEST_CONTENT.getBytes(UTF_8));
+
+        final ContentHeaderBody contentHeader = new ContentHeaderBody(basicContentHeaderProperties, content.remaining());
+        final MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.valueOf("testExchange"),
+                                                                      true,
+                                                                      true,
+                                                                      AMQShortString.valueOf("testRoutingKey"));
+        final MessageMetaData messageMetaData =
+                new MessageMetaData(publishInfo, contentHeader, System.currentTimeMillis());
+        final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+        handle.addContent(content);
+        return new AMQMessage(handle.allContentAdded());
+    }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_Mutator.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_Mutator.java
new file mode 100644
index 0000000..49434b1
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_Mutator.java
@@ -0,0 +1,224 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+
+public class Message_1_0_Mutator implements ServerMessageMutator<Message_1_0>
+{
+    private final Message_1_0 _message;
+    private final MessageStore _messageStore;
+
+    private Header _header;
+    private Map<Symbol, Object> _deliveryAnnotations;
+    private Map<Symbol, Object> _messageAnnotations;
+    private Properties _properties;
+    private Map<String, Object> _applicationProperties;
+    private Map<Symbol, Object> _footer;
+
+    Message_1_0_Mutator(final Message_1_0 message, final MessageStore messageStore)
+    {
+        _message = message;
+        _messageStore = messageStore;
+        final HeaderSection headerSection = message.getHeaderSection();
+        if (headerSection != null)
+        {
+            final Header header = headerSection.getValue();
+            if (header != null)
+            {
+                _header = new Header();
+                _header.setDeliveryCount(header.getDeliveryCount());
+                _header.setDurable(header.getDurable());
+                _header.setFirstAcquirer(header.getFirstAcquirer());
+                _header.setPriority(header.getPriority());
+                _header.setTtl(header.getTtl());
+            }
+            headerSection.dispose();
+        }
+        final DeliveryAnnotationsSection deliveryAnnotationsSection = message.getDeliveryAnnotationsSection();
+        if (deliveryAnnotationsSection != null)
+        {
+            final Map<Symbol, Object> deliveryAnnotations = deliveryAnnotationsSection.getValue();
+            if (deliveryAnnotations != null)
+            {
+                _deliveryAnnotations = new HashMap<>(deliveryAnnotations);
+            }
+            deliveryAnnotationsSection.dispose();
+        }
+        final MessageAnnotationsSection messageAnnotationsSection = message.getMessageAnnotationsSection();
+        if (messageAnnotationsSection != null)
+        {
+            final Map<Symbol, Object> messageAnnotations = messageAnnotationsSection.getValue();
+            if (messageAnnotations != null)
+            {
+                _messageAnnotations = new HashMap<>(messageAnnotations);
+            }
+            messageAnnotationsSection.dispose();
+        }
+        final PropertiesSection propertiesSection = message.getPropertiesSection();
+        if (propertiesSection != null)
+        {
+            final Properties properties = propertiesSection.getValue();
+            if (properties != null)
+            {
+                _properties = new Properties();
+                _properties.setCorrelationId(properties.getCorrelationId());
+                _properties.setAbsoluteExpiryTime(properties.getAbsoluteExpiryTime());
+                _properties.setContentEncoding(properties.getContentEncoding());
+                _properties.setContentType(properties.getContentType());
+                _properties.setCreationTime(properties.getCreationTime());
+                _properties.setGroupId(properties.getGroupId());
+                _properties.setGroupSequence(properties.getGroupSequence());
+                _properties.setMessageId(properties.getMessageId());
+                _properties.setReplyTo(properties.getReplyTo());
+                _properties.setReplyToGroupId(properties.getReplyToGroupId());
+                _properties.setSubject(properties.getSubject());
+                _properties.setTo(properties.getTo());
+                _properties.setUserId(properties.getUserId());
+            }
+            propertiesSection.dispose();
+        }
+        final ApplicationPropertiesSection applicationPropertiesSection = message.getApplicationPropertiesSection();
+        if (applicationPropertiesSection != null)
+        {
+            final Map<String, Object> applicationProperties = applicationPropertiesSection.getValue();
+            if (applicationProperties != null)
+            {
+                _applicationProperties = new HashMap<>(applicationProperties);
+            }
+            applicationPropertiesSection.dispose();
+        }
+        final FooterSection footerSection = message.getFooterSection();
+        if (footerSection != null)
+        {
+            final Map<Symbol, Object> footer = footerSection.getValue();
+            if (footer != null)
+            {
+                _footer = new HashMap<>(footer);
+            }
+            footerSection.dispose();
+        }
+    }
+
+    @Override
+    public void setPriority(final byte priority)
+    {
+        if (_header == null)
+        {
+            _header = new Header();
+        }
+        _header.setPriority(UnsignedByte.valueOf(priority));
+    }
+
+
+    @Override
+    public byte getPriority()
+    {
+        if (_header == null || _header.getPriority() == null)
+        {
+            return 4; //javax.jms.Message.DEFAULT_PRIORITY;
+        }
+        else
+        {
+            return _header.getPriority().byteValue();
+        }
+    }
+
+    @Override
+    public Message_1_0 create()
+    {
+        final long contentSize = _message.getSize();
+
+        HeaderSection headerSection = null;
+        if (_header != null)
+        {
+            headerSection = _header.createEncodingRetainingSection();
+        }
+
+        DeliveryAnnotationsSection deliveryAnnotationsSection = null;
+        if (_deliveryAnnotations != null)
+        {
+            deliveryAnnotationsSection = new DeliveryAnnotations(_deliveryAnnotations).createEncodingRetainingSection();
+        }
+
+        MessageAnnotationsSection messageAnnotationsSection = null;
+        if (_messageAnnotations != null)
+        {
+            messageAnnotationsSection = new MessageAnnotations(_messageAnnotations).createEncodingRetainingSection();
+        }
+
+        PropertiesSection propertiesSection = null;
+        if (_properties != null)
+        {
+            propertiesSection = _properties.createEncodingRetainingSection();
+        }
+
+        ApplicationPropertiesSection applicationPropertiesSection = null;
+        if (_applicationProperties != null)
+        {
+            applicationPropertiesSection =
+                    new ApplicationProperties(_applicationProperties).createEncodingRetainingSection();
+        }
+
+        FooterSection footerSection = null;
+        if (_footer != null)
+        {
+            footerSection = new Footer(_footer).createEncodingRetainingSection();
+        }
+
+        final QpidByteBuffer content = _message.getContent();
+        final MessageMetaData_1_0 mmd = new MessageMetaData_1_0(headerSection,
+                                                                deliveryAnnotationsSection,
+                                                                messageAnnotationsSection,
+                                                                propertiesSection,
+                                                                applicationPropertiesSection,
+                                                                footerSection,
+                                                                _message.getArrivalTime(),
+                                                                contentSize);
+
+        final MessageHandle<MessageMetaData_1_0> handle = _messageStore.addMessage(mmd);
+        if (content != null)
+        {
+            handle.addContent(content);
+        }
+        return new Message_1_0(handle.allContentAdded(), _message.getConnectionReference());
+    }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorFactory.java
similarity index 54%
copy from broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
copy to broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorFactory.java
index b2ac9d7..82ba664 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorFactory.java
@@ -18,22 +18,25 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.server.model.ManagedAttribute;
-import org.apache.qpid.server.model.ManagedContextDefault;
-import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MessageStore;
 
-@ManagedObject( category = false, type="priority",
-        amqpName = "org.apache.qpid.PriorityQueue" )
-public interface PriorityQueue<X extends PriorityQueue<X>> extends Queue<X>
+@PluggableService
+public class Message_1_0_MutatorFactory implements ServerMessageMutatorFactory<Message_1_0>
 {
-    String PRIORITIES = "priorities";
+    @Override
+    public ServerMessageMutator<Message_1_0> create(final Message_1_0 serverMessage, final MessageStore messageStore)
+    {
+        return new Message_1_0_Mutator(serverMessage, messageStore);
+    }
 
-    @ManagedContextDefault( name = "queue.priorities")
-    int DEFAULT_PRIORITY_LEVELS = 10;
-
-    @ManagedAttribute( defaultValue = "${queue.priorities}")
-    int getPriorities();
+    @Override
+    public String getType()
+    {
+        return Message_1_0.class.getName();
+    }
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorTest.java
new file mode 100644
index 0000000..664715c
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorTest.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class Message_1_0_MutatorTest extends UnitTestBase
+{
+    private static final byte TEST_PRIORITY = (byte) 1;
+    private static final String TEST_HEADER_NAME = "foo";
+    private static final String TEST_HEADER_VALUE = "bar";
+    private static final String TEST_CONTENT_TYPE = "text/plain";
+    private static final String TEST_CONTENT = "testContent";
+    private MessageStore _messageStore;
+    private Message_1_0_Mutator _messageMutator;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _messageStore = new TestMemoryMessageStore();
+        final Message_1_0 message = createTestMessage();
+        _messageMutator = new Message_1_0_Mutator(message, _messageStore);
+    }
+
+    @After
+    public void tearDown()
+    {
+        _messageStore.closeMessageStore();
+    }
+
+    @Test
+    public void setPriority()
+    {
+        _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+        assertThat(_messageMutator.getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+    }
+
+    @Test
+    public void getPriority()
+    {
+        assertThat((int) _messageMutator.getPriority(), is(equalTo((int) TEST_PRIORITY)));
+    }
+
+    @Test
+    public void create() throws Exception
+    {
+        _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+
+        final Message_1_0 newMessage = _messageMutator.create();
+
+        assertThat(newMessage.getMessageHeader().getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+        assertThat(newMessage.getMessageHeader().getMimeType(), is(equalTo(TEST_CONTENT_TYPE)));
+        assertThat(newMessage.getMessageHeader().getHeader(TEST_HEADER_NAME), is(equalTo(TEST_HEADER_VALUE)));
+
+        final QpidByteBuffer content = newMessage.getContent();
+
+        final SectionDecoderImpl sectionDecoder =
+                new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry());
+        final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(content);
+        assertThat(sections.size(), is(equalTo(1)));
+
+        final Object value = sections.get(0).getValue();
+        assertThat(value, is(equalTo(TEST_CONTENT)));
+    }
+
+    private Message_1_0 createTestMessage()
+    {
+        final QpidByteBuffer content = new AmqpValue(TEST_CONTENT).createEncodingRetainingSection().getEncodedForm();
+        final long contentSize = content.remaining();
+
+        final Header header = new Header();
+        header.setPriority(UnsignedByte.valueOf(TEST_PRIORITY));
+        final HeaderSection headerSection = header.createEncodingRetainingSection();
+
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf(TEST_CONTENT_TYPE));
+        final PropertiesSection propertiesSection = properties.createEncodingRetainingSection();
+
+        final ApplicationPropertiesSection applicationPropertiesSection =
+                new ApplicationProperties(Collections.singletonMap(TEST_HEADER_NAME, TEST_HEADER_VALUE))
+                        .createEncodingRetainingSection();
+
+        final MessageMetaData_1_0 mmd = new MessageMetaData_1_0(headerSection,
+                                                                null,
+                                                                null,
+                                                                propertiesSection,
+                                                                applicationPropertiesSection,
+                                                                null,
+                                                                System.currentTimeMillis(),
+                                                                contentSize);
+
+        final MessageHandle<MessageMetaData_1_0> handle = _messageStore.addMessage(mmd);
+        handle.addContent(content);
+        return new Message_1_0(handle.allContentAdded());
+    }
+}
diff --git a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageManagementTest.java b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageManagementTest.java
index ae992c5..a786784 100644
--- a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageManagementTest.java
+++ b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageManagementTest.java
@@ -21,10 +21,12 @@
 
 package org.apache.qpid.tests.http.endtoend.message;
 
+import static javax.servlet.http.HttpServletResponse.SC_CREATED;
 import static javax.servlet.http.HttpServletResponse.SC_OK;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThat;
 
@@ -42,9 +44,12 @@ import java.util.stream.Stream;
 
 import javax.servlet.http.HttpServletResponse;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.queue.PriorityQueue;
 import org.apache.qpid.tests.http.HttpRequestConfig;
 import org.apache.qpid.tests.http.HttpTestBase;
 
@@ -207,6 +212,99 @@ public class MessageManagementTest extends HttpTestBase
         assertThat(getBrokerAdmin().getQueueDepthMessages(SOURCE_QUEUE_NAME), is(equalTo(0)));
     }
 
+    @Test
+    public void testReenqueueMessageForPriorityChange() throws Exception
+    {
+        final String queueName = "priorityQueue";
+        createPriorityQueue(queueName, 10);
+        publishPriorityMessage(queueName, "1", 5);
+        publishPriorityMessage(queueName, "2", 6);
+        publishPriorityMessage(queueName, "3", 1);
+
+        final List<Map<String, Object>> messages =
+                getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName));
+
+        assertThat(messages.size(), is(equalTo(3)));
+        final Map<String, Object> message1 = messages.get(0);
+        final Map<String, Object> message2 = messages.get(1);
+        final Map<String, Object> message3 = messages.get(2);
+        assertThat(message1.get("messageId"), is(equalTo("2")));
+        assertThat(message2.get("messageId"), is(equalTo("1")));
+        assertThat(message3.get("messageId"), is(equalTo("3")));
+
+        final Map<String, Object> parameters = new HashMap<>();
+        parameters.put("messageId", message3.get("id"));
+        parameters.put("newPriority", 10);
+        Long result = getHelper().postJson(String.format("queue/%s/reenqueueMessageForPriorityChange", queueName),
+                                           parameters,
+                                           new TypeReference<Long>()
+                                           {
+                                           },
+                                           HttpServletResponse.SC_OK);
+
+        assertThat(result, is(not(equalTo(-1L))));
+
+        final List<Map<String, Object>> messages2 =
+                getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName));
+
+        assertThat(messages.size(), is(equalTo(3)));
+        final Map<String, Object> message1AfterChange = messages2.get(0);
+        final Map<String, Object> message2AfterChange = messages2.get(1);
+        final Map<String, Object> message3AfterChange = messages2.get(2);
+        assertThat(message1AfterChange.get("messageId"), is(equalTo("3")));
+        assertThat(message2AfterChange.get("messageId"), is(equalTo("2")));
+        assertThat(message3AfterChange.get("messageId"), is(equalTo("1")));
+        assertThat(message1AfterChange.get("priority"), is(equalTo(10)));
+    }
+
+    @Test
+    public void testReenqueueMessagesForPriorityChange() throws Exception
+    {
+        final String queueName = "priorityQueue";
+        createPriorityQueue(queueName, 10);
+        publishPriorityMessage(queueName, "1", 5);
+        publishPriorityMessage(queueName, "2", 6);
+        publishPriorityMessage(queueName, "3", 1);
+
+        final List<Map<String, Object>> messages =
+                getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName));
+
+        assertThat(messages.size(), is(equalTo(3)));
+        final Map<String, Object> message1 = messages.get(0);
+        final Map<String, Object> message2 = messages.get(1);
+        final Map<String, Object> message3 = messages.get(2);
+        assertThat(message1.get("messageId"), is(equalTo("2")));
+        assertThat(message2.get("messageId"), is(equalTo("1")));
+        assertThat(message3.get("messageId"), is(equalTo("3")));
+
+        final Map<String, Object> parameters = new HashMap<>();
+        parameters.put("selector", String.format("id in ('%s', '%s')",
+                                                 message3.get("messageId"),
+                                                 message2.get("messageId")));
+        parameters.put("newPriority", 10);
+        final List<Long> result =
+                getHelper().postJson(String.format("queue/%s/reenqueueMessagesForPriorityChange", queueName),
+                                     parameters,
+                                     new TypeReference<List<Long>>()
+                                     {
+                                     },
+                                     HttpServletResponse.SC_OK);
+
+        assertThat(result.size(), is(equalTo(2)));
+
+        final List<Map<String, Object>> messages2 =
+                getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName));
+
+        assertThat(messages.size(), is(equalTo(3)));
+        final Map<String, Object> message1AfterChange = messages2.get(0);
+        final Map<String, Object> message2AfterChange = messages2.get(1);
+        final Map<String, Object> message3AfterChange = messages2.get(2);
+        assertThat(message1AfterChange.get("messageId"), is(equalTo("1")));
+        assertThat(message2AfterChange.get("messageId"), is(equalTo("3")));
+        assertThat(message3AfterChange.get("messageId"), is(equalTo("2")));
+        assertThat(message1AfterChange.get("priority"), is(equalTo(10)));
+        assertThat(message2AfterChange.get("priority"), is(equalTo(10)));
+    }
 
     private List<Map<String, Object>> getMessageDetails(final String queueName) throws IOException
     {
@@ -242,4 +340,25 @@ public class MessageManagementTest extends HttpTestBase
                                   SC_OK);
     }
 
+    private void publishPriorityMessage(final String queueName, final String messageId, int priority) throws Exception
+    {
+        final Map<String, Object> messageBody = new HashMap<>();
+        messageBody.put("address", queueName);
+        messageBody.put("messageId", messageId);
+        messageBody.put("headers", Collections.singletonMap("id", messageId));
+        messageBody.put("priority", priority);
+
+        getHelper().submitRequest("virtualhost/publishMessage",
+                                  "POST",
+                                  Collections.singletonMap("message", messageBody),
+                                  SC_OK);
+    }
+
+    private void createPriorityQueue(final String queueName, int priorities) throws IOException
+    {
+        final Map<String, Object> data = new HashMap<>();
+        data.put(ConfiguredObject.TYPE, "priority");
+        data.put(PriorityQueue.PRIORITIES, priorities);
+        getHelper().submitRequest(String.format("queue/%s", queueName), "PUT", data, SC_CREATED);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org