You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/18 10:45:02 UTC
svn commit: r1302105 - in
/qpid/branches/java-config-and-management/qpid/java:
broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/
broker/src/main/java/org/apache/qpid/server/model/
broker/src/main/java/org/apache/qpid/server/model/adap...
Author: rgodfrey
Date: Sun Mar 18 09:45:01 2012
New Revision: 1302105
URL: http://svn.apache.org/viewvc?rev=1302105&view=rev
Log:
NO-JIRA : [Java Config] Added move/copy/delete messages from queue to MBeans
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/AbstractStatisticsGatheringMBean.java Sun Mar 18 09:45:01 2012
@@ -197,9 +197,4 @@ abstract class AbstractStatisticsGatheri
return _bytesReceived;
}
- public synchronized boolean isStatisticsEnabled()
- {
- updateStats();
- return false; //TODO
- }
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java Sun Mar 18 09:45:01 2012
@@ -155,18 +155,24 @@ public class ConnectionMBean extends Abs
public void rollbackTransactions(int channelId) throws JMException
{
- // TODO - rollin back a transaction on a channel makes *no* sense
+ // TODO - rolling back a transaction on a channel makes *no* sense
}
public void closeConnection() throws Exception
{
- // TODO - Implement close connection
+ getConfiguredObject().delete();
}
+ public synchronized boolean isStatisticsEnabled()
+ {
+ updateStats();
+ return false; //TODO
+ }
+
public void setStatisticsEnabled(boolean enabled)
{
- // TODO - Implement setStatisticsEnables
+ // TODO - Implement setStatisticsEnabled
updateStats();
}
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java Sun Mar 18 09:45:01 2012
@@ -297,11 +297,27 @@ public class ExchangeMBean extends AMQMa
_exchange.createBinding(binding, queue, arguments, Collections.EMPTY_MAP);
}
- public void removeBinding(
- @MBeanOperationParameter(name = ManagedQueue.TYPE, description = "Queue name") String queueName,
- @MBeanOperationParameter(name = "Binding", description = "Binding key") String binding)
+ public void removeBinding(String queueName, String bindingKey)
throws IOException, JMException
{
- // TODO
+ Queue queue = null;
+ VirtualHost vhost = _exchange.getParent(VirtualHost.class);
+ for(Queue aQueue : vhost.getQueues())
+ {
+ if(aQueue.getName().equals(queueName))
+ {
+ queue = aQueue;
+ break;
+ }
+ }
+
+ for(Binding binding : _exchange.getBindings())
+ {
+ if(queue.equals(binding.getParent(Queue.class)) && bindingKey.equals(binding.getName()))
+ {
+ binding.delete();
+ }
+ }
+
}
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java Sun Mar 18 09:45:01 2012
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
import javax.management.ObjectName;
import javax.management.OperationsException;
@@ -46,6 +47,7 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
@@ -383,33 +385,202 @@ public class QueueMBean extends AMQManag
return visitor.getEntry();
}
-
+
public void deleteMessageFromTop() throws IOException, JMException
{
- // TODO
+ VirtualHost vhost = _queue.getParent(VirtualHost.class);
+ vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+ {
+ public void withinTransaction(final VirtualHost.Transaction txn)
+ {
+ _queue.visit(new QueueEntryVisitor()
+ {
+
+ public boolean visit(final QueueEntry entry)
+ {
+ if(entry.acquire())
+ {
+ txn.dequeue(entry);
+ return true;
+ }
+ return false;
+ }
+ });
+
+ }
+ });
+
}
public Long clearQueue() throws IOException, JMException
{
- return null; // TODO
+ VirtualHost vhost = _queue.getParent(VirtualHost.class);
+ final AtomicLong count = new AtomicLong();
+
+ vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+ {
+ public void withinTransaction(final VirtualHost.Transaction txn)
+ {
+ _queue.visit(new QueueEntryVisitor()
+ {
+
+ public boolean visit(final QueueEntry entry)
+ {
+ final ServerMessage message = entry.getMessage();
+ if(message != null)
+ {
+ txn.dequeue(entry);
+ count.incrementAndGet();
+
+ }
+ return false;
+ }
+ });
+
+ }
+ });
+ return count.get();
}
- public void moveMessages(long fromMessageId, long toMessageId, String toQueue)
+ public void moveMessages(final long fromMessageId, final long toMessageId, String toQueue)
throws IOException, JMException
{
- // TODO
+ if ((fromMessageId > toMessageId) || (fromMessageId < 1))
+ {
+ throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
+ }
+
+ VirtualHost vhost = _queue.getParent(VirtualHost.class);
+ Queue destinationQueue = null;
+ for(Queue q : vhost.getQueues())
+ {
+ if(q.getName().equals(toQueue))
+ {
+ destinationQueue = q;
+ break;
+ }
+ }
+ if(destinationQueue == null)
+ {
+ throw new OperationsException("No such queue \""+toQueue+"\"");
+ }
+
+ final Queue queue = destinationQueue;
+
+
+ vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+ {
+ public void withinTransaction(final VirtualHost.Transaction txn)
+ {
+ _queue.visit(new QueueEntryVisitor()
+ {
+
+ public boolean visit(final QueueEntry entry)
+ {
+ final ServerMessage message = entry.getMessage();
+ if(message != null)
+ {
+ final long messageId = message.getMessageNumber();
+
+ if ((messageId >= fromMessageId)
+ && (messageId <= toMessageId))
+ {
+ txn.move(entry, queue);
+ }
+
+ }
+ return false;
+ }
+ });
+ }
+ });
}
- public void deleteMessages(long fromMessageId, long toMessageId)
+ public void deleteMessages(final long fromMessageId, final long toMessageId)
throws IOException, JMException
{
- // TODO
+ VirtualHost vhost = _queue.getParent(VirtualHost.class);
+ vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+ {
+ public void withinTransaction(final VirtualHost.Transaction txn)
+ {
+ _queue.visit(new QueueEntryVisitor()
+ {
+
+ public boolean visit(final QueueEntry entry)
+ {
+ final ServerMessage message = entry.getMessage();
+ if(message != null)
+ {
+ final long messageId = message.getMessageNumber();
+
+ if ((messageId >= fromMessageId)
+ && (messageId <= toMessageId))
+ {
+ txn.dequeue(entry);
+ return true;
+ }
+ return false;
+ }
+ return true;
+ }
+ });
+ }
+ });
}
- public void copyMessages(long fromMessageId, long toMessageId, String toQueue)
+ public void copyMessages(final long fromMessageId, final long toMessageId, String toQueue)
throws IOException, JMException
{
- // TODO
+ if ((fromMessageId > toMessageId) || (fromMessageId < 1))
+ {
+ throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
+ }
+
+ VirtualHost vhost = _queue.getParent(VirtualHost.class);
+ Queue destinationQueue = null;
+ for(Queue q : vhost.getQueues())
+ {
+ if(q.getName().equals(toQueue))
+ {
+ destinationQueue = q;
+ break;
+ }
+ }
+ if(destinationQueue == null)
+ {
+ throw new OperationsException("No such queue \""+toQueue+"\"");
+ }
+
+ final Queue queue = destinationQueue;
+
+
+ vhost.executeTransaction(new VirtualHost.TransactionalOperation()
+ {
+ public void withinTransaction(final VirtualHost.Transaction txn)
+ {
+ _queue.visit(new QueueEntryVisitor()
+ {
+
+ public boolean visit(final QueueEntry entry)
+ {
+ final ServerMessage message = entry.getMessage();
+ if(message != null)
+ {
+ final long messageId = message.getMessageNumber();
+
+ if ((messageId >= fromMessageId)
+ && (messageId <= toMessageId))
+ {
+ txn.copy(entry, queue);
+ }
+
+ }
+ return false;
+ }
+ });
+ }
+ });
}
private List<QueueEntry> getMessages(final long first, final long last)
@@ -417,7 +588,8 @@ public class QueueMBean extends AMQManag
final List<QueueEntry> messages = new ArrayList<QueueEntry>((int)(last-first)+1);
_queue.visit(new QueueEntryVisitor()
{
- private long position = 0;
+ private long position = 1;
+
public boolean visit(QueueEntry entry)
{
if(position >= first && position <= last)
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java Sun Mar 18 09:45:01 2012
@@ -37,6 +37,7 @@ import org.apache.qpid.management.common
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
import org.apache.qpid.server.jmx.ManagedObject;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
@@ -122,10 +123,22 @@ public class VirtualHostManagerMBean ext
}
- public void unregisterExchange(
- @MBeanOperationParameter(name = ManagedExchange.TYPE, description = "Exchange Name") String exchange)
+ public void unregisterExchange(String exchangeName)
throws IOException, JMException, MBeanException
{
+ Exchange theExchange = null;
+ for(Exchange exchange : _virtualHostMBean.getVirtualHost().getExchanges())
+ {
+ if(exchange.getName().equals(exchangeName))
+ {
+ theExchange = exchange;
+ break;
+ }
+ }
+ if(theExchange != null)
+ {
+ theExchange.delete();
+ }
//TODO
}
@@ -156,4 +169,11 @@ public class VirtualHostManagerMBean ext
return getObjectNameForSingleInstanceMBean();
}
+
+ public synchronized boolean isStatisticsEnabled()
+ {
+ updateStats();
+ return false; //TODO
+ }
+
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java Sun Mar 18 09:45:01 2012
@@ -25,4 +25,6 @@ import java.util.Map;
public interface Binding extends ConfiguredObject
{
Map<String,Object> getArguments();
+
+ void delete();
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Connection.java Sun Mar 18 09:45:01 2012
@@ -57,4 +57,6 @@ public interface Connection extends Conf
//children
Collection<Session> getSessions();
+
+ void delete();
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java Sun Mar 18 09:45:01 2012
@@ -43,4 +43,6 @@ public interface Exchange extends Config
// Statistics
+
+ void delete();
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java Sun Mar 18 09:45:01 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.model;
+import org.apache.qpid.server.queue.QueueEntry;
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.Collection;
@@ -70,4 +71,21 @@ public interface VirtualHost extends Con
throws AccessControlException, IllegalArgumentException;
void deleteQueue(Queue queue) throws AccessControlException, IllegalStateException;
+
+ public static interface Transaction
+ {
+ void dequeue(QueueEntry entry);
+
+ void copy(QueueEntry entry, Queue queue);
+
+ void move(QueueEntry entry, Queue queue);
+
+ }
+
+ public static interface TransactionalOperation
+ {
+ void withinTransaction(Transaction txn);
+ }
+
+ void executeTransaction(TransactionalOperation op);
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java Sun Mar 18 09:45:01 2012
@@ -24,6 +24,8 @@ import java.security.AccessControlExcept
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -118,4 +120,20 @@ final class BindingAdapter extends Abstr
{
return new HashMap<String, Object> (_binding.getArguments());
}
+
+ public void delete()
+ {
+ try
+ {
+ _queue.getAMQQueue().getVirtualHost().getBindingFactory().removeBinding(_binding);
+ }
+ catch(AMQSecurityException e)
+ {
+ throw new AccessControlException(e.getMessage());
+ }
+ catch(AMQInternalException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Sun Mar 18 09:45:01 2012
@@ -28,6 +28,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Session;
@@ -78,6 +81,18 @@ final class ConnectionAdapter extends Ab
}
}
+ public void delete()
+ {
+ try
+ {
+ _connection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ }
+ catch(AMQException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
public String getName()
{
return _connection.getRemoteAddressString();
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java Sun Mar 18 09:45:01 2012
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
@@ -140,6 +141,18 @@ final class ExchangeAdapter extends Abst
}
}
+ public void delete()
+ {
+ try
+ {
+ _vhost.getVirtualHost().getExchangeRegistry().unregisterExchange(getName(), false);
+ }
+ catch(AMQException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
public String getName()
{
return _exchange.getName();
@@ -226,4 +239,9 @@ final class ExchangeAdapter extends Abst
childRemoved(adapter);
}
}
+
+ org.apache.qpid.server.exchange.Exchange getExchange()
+ {
+ return _exchange;
+ }
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Sun Mar 18 09:45:01 2012
@@ -35,6 +35,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -46,8 +47,12 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener,
QueueRegistry.RegistryChangeListener,
@@ -443,6 +448,102 @@ final class VirtualHostAdapter extends A
throw new UnsupportedOperationException("Not Yet Implemented");
}
+ public void executeTransaction(TransactionalOperation op)
+ {
+ MessageStore store = _virtualHost.getMessageStore();
+ final LocalTransaction txn = new LocalTransaction(store);
+
+ op.withinTransaction(new Transaction()
+ {
+ public void dequeue(final QueueEntry entry)
+ {
+ txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+
+ public void copy(QueueEntry entry, Queue queue)
+ {
+ final ServerMessage message = entry.getMessage();
+ final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
+
+ txn.enqueue(toQueue, message, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ try
+ {
+ toQueue.enqueue(message);
+ }
+ catch(AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+
+ }
+
+ public void move(final QueueEntry entry, Queue queue)
+ {
+ final ServerMessage message = entry.getMessage();
+ final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
+ if(entry.acquire())
+ {
+ txn.enqueue(toQueue, message,
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ try
+ {
+ toQueue.enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ txn.dequeue(entry.getQueue(), message,
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+ }
+
+ });
+ txn.commit();
+ }
+
org.apache.qpid.server.virtualhost.VirtualHost getVirtualHost()
{
return _virtualHost;
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java Sun Mar 18 09:45:01 2012
@@ -90,6 +90,11 @@ class ExchangeImpl extends AbstractConfi
return null; // TODO - Implement
}
+ public void delete()
+ {
+ // TODO - Implement
+ }
+
public State getActualState()
{
State vhostState = _virtualHost.getActualState();
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java?rev=1302105&r1=1302104&r2=1302105&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java Sun Mar 18 09:45:01 2012
@@ -216,7 +216,6 @@ class VirtualHostImpl extends AbstractCo
}
}
- @Override
public void deleteQueue(Queue queue)
{
synchronized (getLock())
@@ -230,4 +229,9 @@ class VirtualHostImpl extends AbstractCo
notifyChildRemovedListener(queue);
}
}
+
+ public void executeTransaction(TransactionalOperation op)
+ {
+ // TODO - Implement
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org