You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/08/27 16:39:52 UTC

svn commit: r808436 - in /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server: exchange/DefaultExchangeFactory.java transport/ServerSessionDelegate.java

Author: rgodfrey
Date: Thu Aug 27 14:39:51 2009
New Revision: 808436

URL: http://svn.apache.org/viewvc?rev=808436&view=rev
Log:
More updates 

Modified:
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=808436&r1=808435&r2=808436&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Aug 27 14:39:51 2009
@@ -62,7 +62,7 @@
     public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
             throws AMQException
     {
-        ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
+        ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(new AMQShortString(type));
         if (exchType == null)
         {
 
@@ -106,7 +106,7 @@
                     return;
                 }
                 Class<? extends ExchangeType> exchangeTypeClass = exchangeType.getClass();
-                ExchangeType type = exchangeTypeClass.newInstance();
+                ExchangeType<? extends ExchangeType> type = exchangeTypeClass.newInstance();
                 registerExchangeType(type);
             }
             catch (ClassCastException classCastEx)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=808436&r1=808435&r2=808436&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Thu Aug 27 14:39:51 2009
@@ -102,33 +102,65 @@
     @Override
     public void messageSubscribe(Session session, MessageSubscribe method)
     {
-        String destination = method.getDestination();
-        String queueName = method.getQueue();
-        QueueRegistry queueRegistry = getQueueRegistry(session);
 
-        AMQQueue queue = queueRegistry.getQueue(queueName);
+        //TODO - work around broken Python tests
+        if(!method.hasAcceptMode())
+        {
+            method.setAcceptMode(MessageAcceptMode.EXPLICIT);
+        }
+        if(!method.hasAcquireMode())
+        {
+            method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED);
 
-        //TODO null check
+        }
 
+       /* if(!method.hasAcceptMode())
+        {
+            exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Accept-mode not supplied");
+        }
+        else if(!method.hasAcquireMode())
+        {
+            exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied");
+        }
+        else */if(!method.hasQueue())
+        {
+            exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
+        }
+        else
+        {
+            String destination = method.getDestination();
+            String queueName = method.getQueue();
+            QueueRegistry queueRegistry = getQueueRegistry(session);
 
-        FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
 
-        // TODO filters
+            AMQQueue queue = queueRegistry.getQueue(queueName);
 
-        Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+            if(queue == null)
+            {
+                exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");    
+            }
+            else
+            {
 
-        ((ServerSession)session).register(destination, sub);
-        try
-        {
-            queue.registerSubscription(sub, method.getExclusive());
-        }
-        catch (AMQException e)
-        {
-            // TODO
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            throw new RuntimeException(e);
-        }        
+                FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
+
+                // TODO filters
 
+                Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+
+                ((ServerSession)session).register(destination, sub);
+                try
+                {
+                    queue.registerSubscription(sub, method.getExclusive());
+                }
+                catch (AMQException e)
+                {
+                    // TODO
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    throw new RuntimeException(e);
+                }
+            }
+        }
     }
 
 
@@ -266,11 +298,13 @@
 
 
             }
-            else
+            else if(exchange == null)
             {
                 ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
                 ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
 
+
+
                 try
                 {
 
@@ -293,6 +327,10 @@
                 }
 
             }
+            else
+            {
+                // TODO check same as declared
+            }
 
         }
     }
@@ -371,7 +409,23 @@
     @Override
     public void exchangeQuery(Session session, ExchangeQuery method)
     {
-        super.exchangeQuery(session, method);
+
+        ExchangeQueryResult result = new ExchangeQueryResult();
+
+        Exchange exchange = getExchange(session, method.getName());
+
+        if(exchange != null)
+        {
+            result.setDurable(exchange.isDurable());
+            result.setType(exchange.getType().toString());
+            result.setNotFound(false);
+        }
+        else
+        {
+            result.setNotFound(true);
+        }
+
+        session.executionResult((int) method.getId(), result);
     }
 
     @Override
@@ -462,7 +516,7 @@
 
 
         session.executionResult((int) method.getId(), result);
-        super.exchangeBound(session, method);
+
     }
 
     private AMQQueue getQueue(Session session, String queue)
@@ -551,7 +605,7 @@
                     }
                 }
             }
-            else if (queue.getOwner() != null && !((ServerSession)session).getPrincipal().getName().equals(queue.getOwner()))
+            else if (method.getExclusive() && (queue.getOwner() != null && !queue.getOwner().equals(((ServerSession)session).getPrincipal().getName())))
             {
 
                     String description = "Cannot declare queue('" + queueName + "'),"



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org