You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/08/31 13:36:28 UTC

svn commit: r809544 [2/2] - in /qpid/branches/java-broker-0-10/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/flow/ ...

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Aug 31 11:36:26 2009
@@ -24,10 +24,7 @@
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
+import org.apache.qpid.server.exchange.*;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -39,14 +36,13 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.*;
 
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.Collection;
 
 public class ServerSessionDelegate extends SessionDelegate
 {
@@ -90,7 +86,14 @@
     @Override
     public void messageAcquire(Session session, MessageAcquire method)
     {
-        super.messageAcquire(session, method);
+        RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
+
+        Acquired result = new Acquired(acquiredRanges);
+
+
+        session.executionResult((int) method.getId(), result);
+
+        
     }
 
     @Override
@@ -129,35 +132,60 @@
         else
         {
             String destination = method.getDestination();
-            String queueName = method.getQueue();
-            QueueRegistry queueRegistry = getQueueRegistry(session);
-
-
-            AMQQueue queue = queueRegistry.getQueue(queueName);
 
-            if(queue == null)
+            if(((ServerSession)session).getSubscription(destination)!=null)
             {
-                exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");    
+                exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destaination: '"+destination+"'");
             }
             else
             {
+                String queueName = method.getQueue();
+                QueueRegistry queueRegistry = getQueueRegistry(session);
 
-                FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
-
-                // TODO filters
 
-                Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+                AMQQueue queue = queueRegistry.getQueue(queueName);
 
-                ((ServerSession)session).register(destination, sub);
-                try
+                if(queue == null)
                 {
-                    queue.registerSubscription(sub, method.getExclusive());
+                    exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
                 }
-                catch (AMQException e)
+                else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
                 {
-                    // TODO
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    throw new RuntimeException(e);
+                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+                }
+                else
+                {
+
+                    FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+
+                    // TODO filters
+
+                    Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, 
+                                                                  destination,
+                                                                  method.getAcceptMode(),
+                                                                  method.getAcquireMode(),
+                                                                  MessageFlowMode.WINDOW,
+                                                                  creditManager, null);
+
+                    ((ServerSession)session).register(destination, sub);
+                    try
+                    {
+                        queue.registerSubscription(sub, method.getExclusive());
+                    }
+                    catch (AMQQueue.ExistingExclusiveSubscription existing)
+                    {
+                        exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
+                    }
+                    catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+                    {
+                        exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
+                    }
+                    catch (AMQException e)
+                    {
+                        // TODO
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                        throw new RuntimeException(e);
+                    }
                 }
             }
         }
@@ -172,18 +200,34 @@
         if(xfr.hasDestination())
         {
             exchange = exchangeRegistry.getExchange(xfr.getDestination());
+            if(exchange == null)
+            {
+                exchange = exchangeRegistry.getDefaultExchange();
+            }
         }
         else
         {
             exchange = exchangeRegistry.getDefaultExchange();
         }
 
-        MessageTransferMessage message = new MessageTransferMessage(xfr);
+        MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference());
+
+        DeliveryProperties delvProps = null;
+        if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+        {
+            delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
+        }
+
         try
         {
             ArrayList<AMQQueue> queues = exchange.route(message);
 
-            ((ServerSession) ssn).enqueue(message, queues);
+
+
+            if(queues != null)
+            {
+                ((ServerSession) ssn).enqueue(message, queues);
+            }
 
             ssn.processed(xfr);
         }
@@ -281,6 +325,10 @@
             else
             {
                 // TODO - check exchange has same properties
+                if(!exchange.getType().toString().equals(method.getType()))
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
+                }
             }
 
         }
@@ -329,7 +377,10 @@
             }
             else
             {
-                // TODO check same as declared
+                if(!exchange.getType().toString().equals(method.getType()))
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
+                }
             }
 
         }
@@ -343,7 +394,7 @@
         ex.setDescription(description);
 
         session.invoke(ex);
-        session.close();
+        //session.close();
     }
 
     private Exchange getExchange(Session session, String exchangeName)
@@ -431,12 +482,128 @@
     @Override
     public void exchangeBind(Session session, ExchangeBind method)
     {
-        super.exchangeBind(session, method);
+
+        VirtualHost virtualHost = getVirtualHost(session);
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+        if (!method.hasQueue())
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
+        }
+        else if (!method.hasExchange())
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+        }
+/*
+        else if (!method.hasBindingKey())
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
+        }
+*/
+        else
+        {
+            //TODO - here because of non-compiant python tests
+            if (!method.hasBindingKey())
+            {
+                method.setBindingKey(method.getQueue());    
+            }
+            AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+            Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+            if(queue == null)
+            {
+                exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+            }
+            else if(exchange == null)
+            {
+                exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
+            }
+            else if (!virtualHost.getAccessManager().authoriseBind((ServerSession)session, exchange,
+                    queue, new AMQShortString(method.getBindingKey())))
+            {
+                exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange()
+                                                                           + "' to Queue: '" + method.getQueue()
+                                                                           + "' not allowed");
+            }
+            else if(exchange.getType().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+            {
+                exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");    
+            }
+            else
+            {
+                try
+                {
+                    AMQShortString routingKey = new AMQShortString(method.getBindingKey());
+                    FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
+
+                    if (!exchange.isBound(routingKey, fieldTable, queue))
+                    {
+                        queue.bind(exchange, routingKey, fieldTable);
+                    }
+                    else
+                    {
+                        // todo
+                    }
+                }
+                catch (AMQException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+
+
+        }
+
+
+
     }
 
     @Override
     public void exchangeUnbind(Session session, ExchangeUnbind method)
     {
+                VirtualHost virtualHost = getVirtualHost(session);
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+        if (!method.hasQueue())
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
+        }
+        else if (!method.hasExchange())
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+        }
+        else if (!method.hasBindingKey())
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
+        }
+        else
+        {
+            AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+            Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+            if(queue == null)
+            {
+                exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+            }
+            else if(exchange == null)
+            {
+                exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
+            }
+            else
+            {
+                try
+                {
+                    queue.unBind(exchange, new AMQShortString(method.getBindingKey()), null);
+                }
+                catch (AMQException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+
         super.exchangeUnbind(session, method);
     }
 
@@ -445,41 +612,65 @@
     {
 
         ExchangeBoundResult result = new ExchangeBoundResult();
+        Exchange exchange;
+        AMQQueue queue;
         if(method.hasExchange())
         {
-            Exchange exchange = getExchange(session, method.getExchange());
+            exchange = getExchange(session, method.getExchange());
 
             if(exchange == null)
             {
                 result.setExchangeNotFound(true);
             }
+        }
+        else
+        {
+            exchange = getExchangeRegistry(session).getDefaultExchange();
+        }
 
-            if(method.hasQueue())
+
+        if(method.hasQueue())
+        {
+
+            queue = getQueue(session, method.getQueue());
+            if(queue == null)
             {
+                result.setQueueNotFound(true);
+            }
+        
 
-                AMQQueue queue = getQueue(session, method.getQueue());
-                if(queue == null)
-                {
-                    result.setQueueNotFound(true);
-                }
+            if(exchange != null && queue != null)
+            {
+
+                boolean queueMatched = exchange.isBound(queue);
+
+                result.setQueueNotMatched(!queueMatched);
 
-                if(exchange != null && queue != null)
+
+                if(method.hasBindingKey())
                 {
 
-                    if(method.hasBindingKey())
+                    if(method.hasArguments())
+                    {
+                        // TODO
+                    }
+                    if(queueMatched)
                     {
-
-                        if(method.hasArguments())
-                        {
-                            // TODO
-                        }
                         result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
-
                     }
+                    else
+                    {
+                        result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+                    }
+                }
+                else if (method.hasArguments())
+                {
+                    // TODO
+                    
+                }
 
-                    result.setQueueNotMatched(!exchange.isBound(queue));
+                result.setQueueNotMatched(!exchange.isBound(queue));
 
-                }
             }
             else if(exchange != null && method.hasBindingKey())
             {
@@ -492,31 +683,20 @@
             }
 
         }
-        else if(method.hasQueue())
+        else if(exchange != null && method.hasBindingKey())
         {
-            AMQQueue queue = getQueue(session, method.getQueue());
-            if(queue == null)
+            if(method.hasArguments())
             {
-                result.setQueueNotFound(true);
-            }
-            else
-            {
-                if(method.hasBindingKey())
-                {
-                    if(method.hasArguments())
-                    {
-                        // TODO
-                    }
-
-                    // TODO
-                }
+                // TODO
             }
+            result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
 
         }
 
 
         session.executionResult((int) method.getId(), result);
 
+
     }
 
     private AMQQueue getQueue(Session session, String queue)
@@ -580,6 +760,11 @@
                     try
                     {
                         queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
+                        if(method.getExclusive())
+                        {
+                            queue.setPrincipalHolder((ServerSession)session);
+                        }
+
 
                         if (queue.isDurable() && !queue.isAutoDelete())
                         {
@@ -597,6 +782,64 @@
                             queue.bind(defaultExchange, new AMQShortString(queueName), null);
 
                         }
+
+                        if(method.hasAutoDelete()
+                           && method.getAutoDelete()
+                           && method.hasExclusive()
+                           && method.getExclusive())
+                        {
+                            final AMQQueue q = queue;
+                            final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+                            {
+
+                                public void doTask(ServerSession session)
+                                {
+                                    try
+                                    {
+                                        q.delete();
+                                    }
+                                    catch (AMQException e)
+                                    {
+                                        //TODO
+                                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                                    }
+                                }
+                            };
+                            final ServerSession s = (ServerSession) session;
+                            s.addSessionCloseTask(deleteQueueTask);
+                            queue.addQueueDeleteTask(new AMQQueue.Task()
+                            {
+
+                                public void doTask(AMQQueue queue) throws AMQException
+                                {
+                                    s.removeSessionCloseTask(deleteQueueTask);
+                                }
+                            });
+                        }
+                        else if(method.getExclusive())
+                        {
+                            {
+                            final AMQQueue q = queue;
+                            final ServerSession.Task removeExclusive = new ServerSession.Task()
+                            {
+
+                                public void doTask(ServerSession session)
+                                {
+                                    q.setPrincipalHolder(null);
+                                }
+                            };
+                            final ServerSession s = (ServerSession) session;
+                            s.addSessionCloseTask(removeExclusive);
+                            queue.addQueueDeleteTask(new AMQQueue.Task()
+                            {
+
+                                public void doTask(AMQQueue queue) throws AMQException
+                                {
+                                    s.removeSessionCloseTask(removeExclusive);
+                                }
+                            });
+                        }
+                        }
                     }
                     catch (AMQException e)
                     {
@@ -605,13 +848,12 @@
                     }
                 }
             }
-            else if (method.getExclusive() && (queue.getOwner() != null && !queue.getOwner().equals(((ServerSession)session).getPrincipal().getName())))
+            else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
             {
 
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
-                                                                           + "declared on another client ID('"
-                                                                           + queue.getOwner() + "')";
+                                                                           + "declared on another session";
                     ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
 
                     exception(session, method, errorCode, description);
@@ -695,7 +937,11 @@
             }
             else
             {
-                if (method.getIfEmpty() && !queue.isEmpty())
+                if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
+                {
+                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+                }
+                else if (method.getIfEmpty() && !queue.isEmpty())
                 {
                     exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty");
                 }
@@ -746,7 +992,7 @@
         String queueName = method.getQueue();
         if(queueName == null || queueName.length()==0)
         {
-            exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied");
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied");
 
         }
         else
@@ -778,7 +1024,25 @@
     @Override
     public void queueQuery(Session session, QueueQuery method)
     {
-        super.queueQuery(session, method);
+        QueueQueryResult result = new QueueQueryResult();
+
+        AMQQueue queue = getQueue(session, method.getQueue());
+
+        if(queue != null)
+        {
+            result.setQueue(queue.getName().toString());
+            result.setDurable(queue.isDurable());
+            result.setExclusive(queue.isExclusive());
+            result.setAutoDelete(queue.isAutoDelete());
+            result.setArguments(queue.getArguments());
+            result.setMessageCount(queue.getMessageCount());
+            result.setSubscriberCount(queue.getConsumerCount());
+
+        }
+
+
+        session.executionResult((int) method.getId(), result);
+
     }
 
     @Override
@@ -835,15 +1099,14 @@
     public void closed(Session session)
     {
         super.closed(session);
-        for(Subscription_0_10 sub : getSubscriptions(session).values())
+        for(Subscription_0_10 sub : getSubscriptions(session))
         {
-            sub.close();
+            ((ServerSession)session).unregister(sub);
         }
         ((ServerSession)session).onClose();
-        ((ServerSession)session).onClose();
     }
 
-    public Map<String, Subscription_0_10> getSubscriptions(Session session)
+    public Collection<Subscription_0_10> getSubscriptions(Session session)
     {
         return ((ServerSession)session).getSubscriptions();
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Mon Aug 31 11:36:26 2009
@@ -40,7 +40,10 @@
 
         try
         {
-
+            for(Action action : _postCommitActions)
+            {
+                action.onRollback();
+            }
         }
         finally
         {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java Mon Aug 31 11:36:26 2009
@@ -30,6 +30,8 @@
     public static interface Action
     {
         public void postCommit();
+
+        public void onRollback();
     }
 
     void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction);

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Aug 31 11:36:26 2009
@@ -318,6 +318,11 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
+                public boolean isAcquiredBy(Subscription subscription)
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
                 public void setDeliveredToSubscription()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
@@ -368,6 +373,11 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
+                public void requeue(Subscription subscription)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
                 public void dequeue(final StoreContext storeContext) throws FailedDequeueException
                 {
                     //To change body of implemented methods use File | Settings | File Templates.

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Aug 31 11:36:26 2009
@@ -32,6 +32,7 @@
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.PrincipalHolder;
 import org.apache.qpid.AMQException;
 import org.apache.commons.configuration.Configuration;
 
@@ -46,6 +47,8 @@
     private boolean _deleted = false;
     private AMQShortString _name;
 
+    private PrincipalHolder _principalHolder;
+
     public MockAMQQueue(String name)
     {
        _name = new AMQShortString(name);
@@ -171,6 +174,11 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public void requeue(QueueEntryImpl storeContext, Subscription subscription)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
     {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -312,6 +320,16 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public boolean isExclusive()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Map<String, Object> getArguments()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public ManagedObject getManagedObject()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -333,4 +351,14 @@
         
     }
 
+    public PrincipalHolder getPrincipalHolder()
+    {
+        return _principalHolder;
+    }
+
+    public void setPrincipalHolder(PrincipalHolder principalHolder)
+    {
+        _principalHolder = principalHolder;
+    }
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Mon Aug 31 11:36:26 2009
@@ -44,6 +44,11 @@
         return false;
     }
 
+    public boolean isAcquiredBy(Subscription subscription)
+    {
+        return false;
+    }
+
     public void addStateChangeListener(StateChangeListener listener)
     {
 
@@ -163,7 +168,12 @@
 
     }
 
-    
+    public void requeue(Subscription subscription)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+
     public void setDeliveredToSubscription()
     {
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Aug 31 11:36:26 2009
@@ -186,7 +186,7 @@
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
         
         // Check removing the subscription removes it's information from the queue
         _queue.unregisterSubscription(_subscription);
@@ -197,7 +197,7 @@
         
         AMQMessage messageB = createMessage(new Long (25));
         _queue.enqueue(messageB);
-        QueueEntry entry = _subscription.getLastSeenEntry();
+        QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
         assertNull(entry);
     }
     
@@ -207,7 +207,7 @@
         _queue.enqueue(messageA);
         _queue.registerSubscription(_subscription, false);
         Thread.sleep(150);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
     }
 
     public void testExclusiveConsumer() throws AMQException
@@ -224,7 +224,7 @@
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
         
         // Check we cannot add a second subscriber to the queue
         Subscription subB = new MockSubscription();
@@ -273,7 +273,7 @@
         Long id = new Long(26);
         AMQMessage message = createMessage(id);
         _queue.enqueue(message);
-        QueueEntry entry = _subscription.getLastSeenEntry();
+        QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
         entry.setRedelivered(true);
         _queue.resend(entry, _subscription);
         

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Mon Aug 31 11:36:26 2009
@@ -39,7 +39,7 @@
     private AMQShortString tag = new AMQShortString("mocktag");
     private AMQQueue queue = null;
     private StateListener _listener = null;
-    private QueueEntry lastSeen = null;
+    private AMQQueue.Context _queueContext = null;
     private State _state = State.ACTIVE;
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();
@@ -69,9 +69,9 @@
         return tag ;
     }
 
-    public QueueEntry getLastSeenEntry()
+    public AMQQueue.Context getQueueContext()
     {
-        return lastSeen;
+        return _queueContext;
     }
 
     public SubscriptionAcquiredState getOwningState()
@@ -147,25 +147,23 @@
     {
     }
 
+    public void onDequeue(QueueEntry queueEntry)
+    {
+    }
+
     public void restoreCredit(QueueEntry queueEntry)
     {
+        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public void send(QueueEntry msg) throws AMQException
     {
-        lastSeen = msg;
         messages.add(msg);
     }
 
-    public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+    public void setQueueContext(AMQQueue.Context queueContext)
     {
-        boolean result = false;
-        if (expected != null)
-        {
-            result = (expected.equals(lastSeen));
-        }
-        lastSeen = newValue;
-        return result;
+        _queueContext = queueContext;
     }
 
     public void setQueue(AMQQueue queue)

Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Mon Aug 31 11:36:26 2009
@@ -22,6 +22,10 @@
 
 import org.apache.mina.common.ByteBuffer;
 
+import java.util.Date;
+import java.util.Map;
+import java.math.BigDecimal;
+
 /**
  * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter
  * value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides
@@ -113,4 +117,63 @@
         return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
     }
 
+
+    public static AMQTypedValue toTypedValue(Object val)
+    {
+        if(val == null)
+        {
+            return AMQType.VOID.asTypedValue(null);
+        }
+
+        Class klass = val.getClass();
+        if(klass == String.class)
+        {
+            return AMQType.ASCII_STRING.asTypedValue(val);
+        }
+        else if(klass == Character.class)
+        {
+            return AMQType.ASCII_CHARACTER.asTypedValue(val);
+        }
+        else if(klass == Integer.class)
+        {
+            return AMQType.INT.asTypedValue(val);
+        }
+        else if(klass == Long.class)
+        {
+            return AMQType.LONG.asTypedValue(val);
+        }
+        else if(klass == Float.class)
+        {
+            return AMQType.FLOAT.asTypedValue(val);
+        }
+        else if(klass == Double.class)
+        {
+            return AMQType.DOUBLE.asTypedValue(val);
+        }
+        else if(klass == Date.class)
+        {
+            return AMQType.TIMESTAMP.asTypedValue(val);
+        }
+        else if(klass == Byte.class)
+        {
+            return AMQType.BYTE.asTypedValue(val);
+        }
+        else if(klass == Boolean.class)
+        {
+            return AMQType.BOOLEAN.asTypedValue(val);
+        }
+        else if(klass == byte[].class)
+        {
+            return AMQType.BINARY.asTypedValue(val);
+        }
+        else if(klass == BigDecimal.class)
+        {
+            return AMQType.DECIMAL.asTypedValue(val);
+        }
+        else if(val instanceof Map)
+        {
+            return AMQType.FIELD_TABLE.asTypedValue(FieldTable.convertToFieldTable((Map)val));
+        }
+        return null;
+    }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Mon Aug 31 11:36:26 2009
@@ -828,6 +828,7 @@
         recalculateEncodedSize();
     }
 
+
     public static interface FieldTableElementProcessor
     {
         public boolean processElement(String propertyName, AMQTypedValue value);
@@ -1187,4 +1188,24 @@
 
         return _properties.equals(f._properties);
     }
+
+    public static FieldTable convertToFieldTable(Map<String, Object> map)
+    {
+        if (map != null)
+        {
+            FieldTable table = new FieldTable();
+            for(Map.Entry<String,Object> entry : map.entrySet())
+            {
+                table.put(new AMQShortString(entry.getKey()), entry.getValue());
+            }
+
+            return table;
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Aug 31 11:36:26 2009
@@ -316,7 +316,14 @@
     public void dispatch(Method method)
     {
         Session ssn = getSession(method.getChannel());
-        ssn.received(method);
+        if(ssn != null)
+        {
+            ssn.received(method);
+        }
+        else
+        {
+            // TODO
+        }
     }
 
     public int getChannelMax()

Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java Mon Aug 31 11:36:26 2009
@@ -179,6 +179,11 @@
         }
     }
 
+    public boolean hasCompletionListener()
+    {
+        return completionListener != null;
+    }
+
     public String toString()
     {
         StringBuilder str = new StringBuilder();

Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Aug 31 11:36:26 2009
@@ -617,7 +617,7 @@
                 {
                     sessionCommandPoint(0, 0);
                 }
-                if (expiry > 0 && !m.isUnreliable())
+                if ((expiry > 0 && !m.isUnreliable()) || m.hasCompletionListener())
                 {
                     commands[mod(next, commands.length)] = m;
                     commandBytes += m.getBodySize();

Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Mon Aug 31 11:36:26 2009
@@ -137,6 +137,7 @@
         }
         catch (Throwable t)
         {
+            t.printStackTrace();
             if (!(shutdownBroken &&
                   t instanceof SocketException &&
                   t.getMessage().equalsIgnoreCase("socket closed") &&

Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=809544&r1=809543&r2=809544&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Mon Aug 31 11:36:26 2009
@@ -33,6 +33,7 @@
     private final List<QueueEntry> messages;
     private final Object key;
     private boolean isSuspended;
+    private AMQQueue.Context _queueContext;
 
     public SubscriptionTestHelper(Object key)
     {
@@ -101,11 +102,16 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void restoreCredit(final QueueEntry queueEntry)
+    public void onDequeue(final QueueEntry queueEntry)
     {
 
     }
 
+    public void restoreCredit(QueueEntry queueEntry)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void setStateListener(final StateListener listener)
     {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -116,9 +122,14 @@
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public QueueEntry getLastSeenEntry()
+    public AMQQueue.Context getQueueContext()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return _queueContext;
+    }
+
+    public void setQueueContext(AMQQueue.Context queueContext)
+    {
+        _queueContext = queueContext;
     }
 
     public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org