You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/18 10:45:02 UTC

svn commit: r1302105 - in /qpid/branches/java-config-and-management/qpid/java: broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ broker/src/main/java/org/apache/qpid/server/model/ broker/src/main/java/org/apache/qpid/server/model/adap...

Author: rgodfrey
Date: Sun Mar 18 09:45:01 2012
New Revision: 1302105

URL: http://svn.apache.org/viewvc?rev=1302105&view=rev
Log:
NO-JIRA : [Java Config] Added move/copy/delete messages from queue to MBeans

Modified:
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java Sun Mar 18 09:45:01 2012
@@ -197,9 +197,4 @@ abstract class AbstractStatisticsGatheri
         return _bytesReceived;
     }
 
-    public synchronized boolean isStatisticsEnabled()
-    {
-        updateStats();
-        return false;  //TODO
-    }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java Sun Mar 18 09:45:01 2012
@@ -155,18 +155,24 @@ public class ConnectionMBean extends Abs
 
     public void rollbackTransactions(int channelId) throws JMException
     {
-        // TODO - rollin back a transaction on a channel makes *no* sense
+        // TODO - rolling back a transaction on a channel makes *no* sense
     }
 
     public void closeConnection() throws Exception
     {
-        // TODO - Implement close connection
+        getConfiguredObject().delete();
     }
 
 
+    public synchronized boolean isStatisticsEnabled()
+    {
+        updateStats();
+        return false;  //TODO
+    }
+
     public void setStatisticsEnabled(boolean enabled)
     {
-        // TODO - Implement setStatisticsEnables
+        // TODO - Implement setStatisticsEnabled
         updateStats();
     }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java Sun Mar 18 09:45:01 2012
@@ -297,11 +297,27 @@ public class ExchangeMBean extends AMQMa
         _exchange.createBinding(binding, queue, arguments, Collections.EMPTY_MAP);
     }
 
-    public void removeBinding(
-            @MBeanOperationParameter(name = ManagedQueue.TYPE, description = "Queue name") String queueName,
-            @MBeanOperationParameter(name = "Binding", description = "Binding key") String binding)
+    public void removeBinding(String queueName, String bindingKey)
             throws IOException, JMException
     {
-        // TODO
+        Queue queue = null;
+        VirtualHost vhost = _exchange.getParent(VirtualHost.class);
+        for(Queue aQueue : vhost.getQueues())
+        {
+            if(aQueue.getName().equals(queueName))
+            {
+                queue = aQueue;
+                break;
+            }
+        }
+
+        for(Binding binding : _exchange.getBindings())
+        {
+            if(queue.equals(binding.getParent(Queue.class)) && bindingKey.equals(binding.getName()))
+            {
+                binding.delete();
+            }
+        }
+        
     }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java Sun Mar 18 09:45:01 2012
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.management.JMException;
 import javax.management.ObjectName;
 import javax.management.OperationsException;
@@ -46,6 +47,7 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 
@@ -383,33 +385,202 @@ public class QueueMBean extends AMQManag
         return visitor.getEntry();
 
     }
-
+    
     public void deleteMessageFromTop() throws IOException, JMException
     {
-        // TODO
+        VirtualHost vhost = _queue.getParent(VirtualHost.class);
+        vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+        {
+            public void withinTransaction(final VirtualHost.Transaction txn)
+            {
+                _queue.visit(new QueueEntryVisitor()
+                {
+
+                    public boolean visit(final QueueEntry entry)
+                    {
+                        if(entry.acquire())
+                        {
+                            txn.dequeue(entry);
+                            return true;
+                        }
+                        return false;
+                    }
+                });
+
+            }
+        });
+
     }
 
     public Long clearQueue() throws IOException, JMException
     {
-        return null;  // TODO
+        VirtualHost vhost = _queue.getParent(VirtualHost.class);
+        final AtomicLong count = new AtomicLong();
+
+        vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+        {
+            public void withinTransaction(final VirtualHost.Transaction txn)
+            {
+                _queue.visit(new QueueEntryVisitor()
+                {
+
+                    public boolean visit(final QueueEntry entry)
+                    {
+                        final ServerMessage message = entry.getMessage();
+                        if(message != null)
+                        {
+                            txn.dequeue(entry);
+                            count.incrementAndGet();
+
+                        }
+                        return false;
+                    }
+                });
+
+            }
+        });
+        return count.get();
     }
 
-    public void moveMessages(long fromMessageId, long toMessageId, String toQueue)
+    public void moveMessages(final long fromMessageId, final long toMessageId, String toQueue)
             throws IOException, JMException
     {
-        // TODO
+        if ((fromMessageId > toMessageId) || (fromMessageId < 1))
+        {
+            throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
+        }
+
+        VirtualHost vhost = _queue.getParent(VirtualHost.class);
+        Queue destinationQueue = null;
+        for(Queue q : vhost.getQueues())
+        {
+            if(q.getName().equals(toQueue))
+            {
+                destinationQueue = q;
+                break;
+            }
+        }
+        if(destinationQueue == null)
+        {
+            throw new OperationsException("No such queue \""+toQueue+"\"");
+        }
+
+        final Queue queue = destinationQueue;
+
+
+        vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+        {
+            public void withinTransaction(final VirtualHost.Transaction txn)
+            {
+                _queue.visit(new QueueEntryVisitor()
+                {
+
+                    public boolean visit(final QueueEntry entry)
+                    {
+                        final ServerMessage message = entry.getMessage();
+                        if(message != null)
+                        {
+                            final long messageId = message.getMessageNumber();
+
+                            if ((messageId >= fromMessageId)
+                                && (messageId <= toMessageId))
+                            {
+                                txn.move(entry, queue);
+                            }
+
+                        }
+                        return false;
+                    }
+                });
+            }
+        });
     }
 
-    public void deleteMessages(long fromMessageId, long toMessageId)
+    public void deleteMessages(final long fromMessageId, final long toMessageId)
             throws IOException, JMException
     {
-        // TODO
+        VirtualHost vhost = _queue.getParent(VirtualHost.class);
+        vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+        {
+            public void withinTransaction(final VirtualHost.Transaction txn)
+            {
+                _queue.visit(new QueueEntryVisitor()
+                {
+
+                    public boolean visit(final QueueEntry entry)
+                    {
+                        final ServerMessage message = entry.getMessage();
+                        if(message != null)
+                        {
+                            final long messageId = message.getMessageNumber();
+
+                            if ((messageId >= fromMessageId)
+                                && (messageId <= toMessageId))
+                            {
+                                txn.dequeue(entry);
+                                return true;
+                            }
+                            return false;
+                        }
+                        return true;
+                    }
+                });
+            }
+        });
     }
 
-    public void copyMessages(long fromMessageId, long toMessageId, String toQueue)
+    public void copyMessages(final long fromMessageId, final long toMessageId, String toQueue)
             throws IOException, JMException
     {
-        // TODO
+        if ((fromMessageId > toMessageId) || (fromMessageId < 1))
+        {
+            throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
+        }
+
+        VirtualHost vhost = _queue.getParent(VirtualHost.class);
+        Queue destinationQueue = null;
+        for(Queue q : vhost.getQueues())
+        {
+            if(q.getName().equals(toQueue))
+            {
+                destinationQueue = q;
+                break;
+            }
+        }
+        if(destinationQueue == null)
+        {
+            throw new OperationsException("No such queue \""+toQueue+"\"");
+        }
+
+        final Queue queue = destinationQueue;
+
+
+        vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+        {
+            public void withinTransaction(final VirtualHost.Transaction txn)
+            {
+                _queue.visit(new QueueEntryVisitor()
+                {
+
+                    public boolean visit(final QueueEntry entry)
+                    {
+                        final ServerMessage message = entry.getMessage();
+                        if(message != null)
+                        {
+                            final long messageId = message.getMessageNumber();
+
+                            if ((messageId >= fromMessageId)
+                                && (messageId <= toMessageId))
+                            {
+                                txn.copy(entry, queue);
+                            }
+
+                        }
+                        return false;
+                    }
+                });
+            }
+        });
     }
 
     private List<QueueEntry> getMessages(final long first, final long last)
@@ -417,7 +588,8 @@ public class QueueMBean extends AMQManag
         final List<QueueEntry> messages = new ArrayList<QueueEntry>((int)(last-first)+1);
         _queue.visit(new QueueEntryVisitor()
         {
-            private long position = 0;
+            private long position = 1;
+
             public boolean visit(QueueEntry entry)
             {
                 if(position >= first && position <= last)

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java Sun Mar 18 09:45:01 2012
@@ -37,6 +37,7 @@ import org.apache.qpid.management.common
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
 import org.apache.qpid.server.jmx.ManagedObject;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
@@ -122,10 +123,22 @@ public class VirtualHostManagerMBean ext
 
     }
 
-    public void unregisterExchange(
-            @MBeanOperationParameter(name = ManagedExchange.TYPE, description = "Exchange Name") String exchange)
+    public void unregisterExchange(String exchangeName)
             throws IOException, JMException, MBeanException
     {
+        Exchange theExchange = null;
+        for(Exchange exchange : _virtualHostMBean.getVirtualHost().getExchanges())
+        {
+            if(exchange.getName().equals(exchangeName))
+            {
+                theExchange = exchange;
+                break;
+            }
+        }
+        if(theExchange != null)
+        {
+            theExchange.delete();
+        }
         //TODO
     }
 
@@ -156,4 +169,11 @@ public class VirtualHostManagerMBean ext
         return getObjectNameForSingleInstanceMBean();
     }
 
+
+    public synchronized boolean isStatisticsEnabled()
+    {
+        updateStats();
+        return false;  //TODO
+    }
+
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java Sun Mar 18 09:45:01 2012
@@ -25,4 +25,6 @@ import java.util.Map;
 public interface Binding extends ConfiguredObject
 {
     Map<String,Object> getArguments();
+
+    void delete();
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java Sun Mar 18 09:45:01 2012
@@ -57,4 +57,6 @@ public interface Connection extends Conf
 
     //children
     Collection<Session> getSessions();
+
+    void delete();
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java Sun Mar 18 09:45:01 2012
@@ -43,4 +43,6 @@ public interface Exchange extends Config
 
 
     // Statistics
+
+    void delete();
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java Sun Mar 18 09:45:01 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.model;
 
+import org.apache.qpid.server.queue.QueueEntry;
 import java.security.AccessControlException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -70,4 +71,21 @@ public interface VirtualHost extends Con
                     throws AccessControlException, IllegalArgumentException;
 
     void deleteQueue(Queue queue) throws AccessControlException, IllegalStateException;
+
+    public static interface Transaction
+    {
+        void dequeue(QueueEntry entry);
+
+        void copy(QueueEntry entry, Queue queue);
+        
+        void move(QueueEntry entry, Queue queue);
+
+    }
+
+    public static interface TransactionalOperation
+    {
+        void withinTransaction(Transaction txn);
+    }
+
+    void executeTransaction(TransactionalOperation op);
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java Sun Mar 18 09:45:01 2012
@@ -24,6 +24,8 @@ import java.security.AccessControlExcept
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -118,4 +120,20 @@ final class BindingAdapter extends Abstr
     {
         return new HashMap<String, Object> (_binding.getArguments());
     }
+
+    public void delete()
+    {
+        try
+        {
+            _queue.getAMQQueue().getVirtualHost().getBindingFactory().removeBinding(_binding);
+        }
+        catch(AMQSecurityException e)
+        {
+            throw new AccessControlException(e.getMessage());
+        }
+        catch(AMQInternalException e)
+        {
+            throw new IllegalStateException(e);
+        }
+    }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Sun Mar 18 09:45:01 2012
@@ -28,6 +28,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Session;
@@ -78,6 +81,18 @@ final class ConnectionAdapter extends Ab
         }
     }
 
+    public void delete()
+    {
+        try
+        {
+            _connection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+        }
+        catch(AMQException e)
+        {
+            throw new IllegalStateException(e);
+        }
+    }
+
     public String getName()
     {
         return _connection.getRemoteAddressString();

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java Sun Mar 18 09:45:01 2012
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.binding.Binding;
@@ -140,6 +141,18 @@ final class ExchangeAdapter extends Abst
         }
     }
 
+    public void delete()
+    {
+        try
+        {
+            _vhost.getVirtualHost().getExchangeRegistry().unregisterExchange(getName(), false);
+        }
+        catch(AMQException e)
+        {
+            throw new IllegalStateException(e);
+        }
+    }
+
     public String getName()
     {
         return _exchange.getName();
@@ -226,4 +239,9 @@ final class ExchangeAdapter extends Abst
             childRemoved(adapter);
         }
     }
+
+    org.apache.qpid.server.exchange.Exchange getExchange()
+    {
+        return _exchange;
+    }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Sun Mar 18 09:45:01 2012
@@ -35,6 +35,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -46,8 +47,12 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 
 final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener,
                                                                   QueueRegistry.RegistryChangeListener,
@@ -443,6 +448,102 @@ final class VirtualHostAdapter extends A
         throw new UnsupportedOperationException("Not Yet Implemented");
     }
 
+    public void executeTransaction(TransactionalOperation op)
+    {
+        MessageStore store = _virtualHost.getMessageStore();
+        final LocalTransaction txn = new LocalTransaction(store);
+
+        op.withinTransaction(new Transaction()
+        {
+            public void dequeue(final QueueEntry entry)
+            {
+                txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action()
+                {
+                    public void postCommit()
+                    {
+                        entry.discard();
+                    }
+
+                    public void onRollback()
+                    {
+                    }
+                });
+            }
+
+            public void copy(QueueEntry entry, Queue queue)
+            {
+                final ServerMessage message = entry.getMessage();
+                final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
+
+                txn.enqueue(toQueue, message, new ServerTransaction.Action()
+                {
+                    public void postCommit()
+                    {
+                        try
+                        {
+                            toQueue.enqueue(message);
+                        }
+                        catch(AMQException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    public void onRollback()
+                    {
+                    }
+                });
+
+            }
+
+            public void move(final QueueEntry entry, Queue queue)
+            {
+                final ServerMessage message = entry.getMessage();
+                final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
+                if(entry.acquire())
+                {
+                    txn.enqueue(toQueue, message,
+                                new ServerTransaction.Action()
+                                {
+
+                                    public void postCommit()
+                                    {
+                                        try
+                                        {
+                                            toQueue.enqueue(message);
+                                        }
+                                        catch (AMQException e)
+                                        {
+                                            throw new RuntimeException(e);
+                                        }
+                                    }
+
+                                    public void onRollback()
+                                    {
+                                        entry.release();
+                                    }
+                                });
+                    txn.dequeue(entry.getQueue(), message,
+                                new ServerTransaction.Action()
+                                {
+
+                                    public void postCommit()
+                                    {
+                                        entry.discard();
+                                    }
+
+                                    public void onRollback()
+                                    {
+
+                                    }
+                                });
+                }
+            }
+
+        });
+        txn.commit();
+    }
+
     org.apache.qpid.server.virtualhost.VirtualHost getVirtualHost()
     {
         return _virtualHost;

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java Sun Mar 18 09:45:01 2012
@@ -90,6 +90,11 @@ class ExchangeImpl extends AbstractConfi
         return null;  // TODO - Implement
     }
 
+    public void delete()
+    {
+        // TODO - Implement
+    }
+
     public State getActualState()
     {
         State vhostState = _virtualHost.getActualState();

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java Sun Mar 18 09:45:01 2012
@@ -216,7 +216,6 @@ class VirtualHostImpl extends AbstractCo
         }
     }
 
-    @Override
     public void deleteQueue(Queue queue)
     {
         synchronized (getLock())
@@ -230,4 +229,9 @@ class VirtualHostImpl extends AbstractCo
             notifyChildRemovedListener(queue);
         }
     }
+
+    public void executeTransaction(TransactionalOperation op)
+    {
+        // TODO - Implement
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org