You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 22:24:57 UTC

svn commit: r1157654 - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src: main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/protocol/ main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/security/auth/manag...

Author: rgodfrey
Date: Sun Aug 14 20:24:57 2011
New Revision: 1157654

URL: http://svn.apache.org/viewvc?rev=1157654&view=rev
Log:
Reapplying 1-0 sandbox changes to correct branch (hopefully)

Modified:
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Sun Aug 14 20:24:57 2011
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.logging.*;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -77,6 +78,8 @@ public class Main
     private static final int IPV4_ADDRESS_LENGTH = 4;
 
     private static final char IPV4_LITERAL_SEPARATOR = '.';
+    private java.util.logging.Logger FRAME_LOGGER;
+    private java.util.logging.Logger RAW_LOGGER;
 
     protected static class InitException extends Exception
     {
@@ -249,6 +252,10 @@ public class Main
 
     protected void startup() throws Exception
     {
+
+        FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log");
+        RAW_LOGGER = updateLogger("RAW", "qpid-raw.log");
+
         final String QpidHome = System.getProperty(QPID_HOME);
         final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
         final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
@@ -449,6 +456,39 @@ public class Main
 
     }
 
+    private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException
+    {
+        java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType);
+        logger.setLevel(Level.FINE);
+        Formatter formatter = new Formatter()
+        {
+            @Override
+            public String format(final LogRecord record)
+            {
+
+                return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n";
+            }
+        };
+        for(Handler handler : logger.getHandlers())
+        {
+            logger.removeHandler(handler);
+        }
+        Handler handler = new ConsoleHandler();
+
+        handler.setLevel(Level.FINE);
+        handler.setFormatter(formatter);
+
+        logger.addHandler(handler);
+
+
+        handler = new FileHandler(logFileName, true);
+        handler.setLevel(Level.FINE);
+        handler.setFormatter(formatter);
+
+        logger.addHandler(handler);
+        return logger;
+    }
+
     private void parsePortArray(Set<Integer> ports, String[] portStr)
             throws InitException
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Sun Aug 14 20:24:57 2011
@@ -35,7 +35,7 @@ public class MultiVersionProtocolEngineF
     ;
 
 
-    public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
+    public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 };
 
     private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Aug 14 20:24:57 2011
@@ -188,7 +188,7 @@ public class SimpleAMQQueue implements A
     //TODO : persist creation time
     private long _createTime = System.currentTimeMillis();
     private ConfigurationPlugin _queueConfiguration;
-
+    private final boolean _isTopic;
 
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
@@ -234,10 +234,12 @@ public class SimpleAMQQueue implements A
         _exclusive = exclusive;
         _virtualHost = virtualHost;
         _entries = entryListFactory.createQueueEntryList(this);
-        _arguments = arguments;
+        _arguments = arguments == null ? Collections.EMPTY_MAP : arguments;
 
         _id = virtualHost.getConfigStore().createId();
 
+        _isTopic = arguments != null && arguments.containsKey("topic");
+
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
         _logSubject = new QueueLogSubject(this);
@@ -329,7 +331,7 @@ public class SimpleAMQQueue implements A
     {
         return _exclusive;
     }
-    
+
     public void setExclusive(boolean exclusive) throws AMQException
     {
         _exclusive = exclusive;
@@ -404,8 +406,8 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied");
         }
-        
-        
+
+
         if (hasExclusiveSubscriber())
         {
             throw new ExistingExclusiveSubscription();
@@ -435,14 +437,14 @@ public class SimpleAMQQueue implements A
                 subscription.setNoLocal(_nolocal);
             }
             _subscriptionList.add(subscription);
-            
+
             //Increment consumerCountHigh if necessary. (un)registerSubscription are both
             //synchronized methods so we don't need additional synchronization here
             if(_counsumerCountHigh.get() < getConsumerCount())
             {
                 _counsumerCountHigh.incrementAndGet();
             }
-            
+
             if (isDeleted())
             {
                 subscription.queueDeleted(this);
@@ -488,6 +490,11 @@ public class SimpleAMQQueue implements A
                 // queue. This is because the delete method uses the subscription set which has just been cleared
                 subscription.queueDeleted(this);
             }
+
+            if(_subscriptionList.size() == 0 && _isTopic)
+            {
+                clearQueue();
+            }
         }
 
     }
@@ -514,10 +521,10 @@ public class SimpleAMQQueue implements A
                 break;
             }
         }
-        
+
         reconfigure();
     }
-    
+
     private void reconfigure()
     {
         //Reconfigure the queue for to reflect this new binding.
@@ -543,7 +550,7 @@ public class SimpleAMQQueue implements A
     public void removeBinding(final Binding binding)
     {
         _bindings.remove(binding);
-        
+
         reconfigure();
     }
 
@@ -570,101 +577,104 @@ public class SimpleAMQQueue implements A
 
     public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
     {
-        incrementTxnEnqueueStats(message);
-        incrementQueueCount();
-        incrementQueueSize(message);
         _totalMessagesReceived.incrementAndGet();
 
 
-        QueueEntry entry;
         Subscription exclusiveSub = _exclusiveSubscriber;
-
-        if (exclusiveSub != null)
+        if(!_isTopic || _subscriptionList.size()!=0)
         {
-            exclusiveSub.getSendLock();
+            incrementTxnEnqueueStats(message);
+            incrementQueueCount();
+            incrementQueueSize(message);
 
-            try
-            {
-                entry = _entries.add(message);
+            QueueEntry entry;
 
-                deliverToSubscription(exclusiveSub, entry);
-            }
-            finally
+            if (exclusiveSub != null)
             {
-                exclusiveSub.releaseSendLock();
-            }
-        }
-        else
-        {
-            entry = _entries.add(message);
-            /*
-
-            iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+                exclusiveSub.getSendLock();
 
-             */
-            SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
-            SubscriptionList.SubscriptionNode nextNode = node.getNext();
-            if (nextNode == null)
-            {
-                nextNode = _subscriptionList.getHead().getNext();
-            }
-            while (nextNode != null)
-            {
-                if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+                try
                 {
-                    break;
+                    entry = _entries.add(message);
+
+                    deliverToSubscription(exclusiveSub, entry);
                 }
-                else
+                finally
                 {
-                    node = _lastSubscriptionNode.get();
-                    nextNode = node.getNext();
-                    if (nextNode == null)
-                    {
-                        nextNode = _subscriptionList.getHead().getNext();
-                    }
+                    exclusiveSub.releaseSendLock();
                 }
             }
+            else
+            {
+                entry = _entries.add(message);
+                /*
 
-            // always do one extra loop after we believe we've finished
-            // this catches the case where we *just* miss an update
-            int loops = 2;
+                iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
 
-            while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
-            {
+                 */
+                SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+                SubscriptionList.SubscriptionNode nextNode = node.getNext();
                 if (nextNode == null)
                 {
-                    loops--;
-                    nextNode = _subscriptionList.getHead();
+                    nextNode = _subscriptionList.getHead().getNext();
                 }
-                else
+                while (nextNode != null)
                 {
-                    // if subscription at end, and active, offer
-                    Subscription sub = nextNode.getSubscription();
-                    deliverToSubscription(sub, entry);
+                    if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+                    {
+                        break;
+                    }
+                    else
+                    {
+                        node = _lastSubscriptionNode.get();
+                        nextNode = node.getNext();
+                        if (nextNode == null)
+                        {
+                            nextNode = _subscriptionList.getHead().getNext();
+                        }
+                    }
                 }
-                nextNode = nextNode.getNext();
 
+                // always do one extra loop after we believe we've finished
+                // this catches the case where we *just* miss an update
+                int loops = 2;
+
+                while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+                {
+                    if (nextNode == null)
+                    {
+                        loops--;
+                        nextNode = _subscriptionList.getHead();
+                    }
+                    else
+                    {
+                        // if subscription at end, and active, offer
+                        Subscription sub = nextNode.getSubscription();
+                        deliverToSubscription(sub, entry);
+                    }
+                    nextNode = nextNode.getNext();
+
+                }
             }
-        }
 
 
-        if (!(entry.isAcquired() || entry.isDeleted()))
-        {
-            checkSubscriptionsNotAheadOfDelivery(entry);
+            if (!(entry.isAcquired() || entry.isDeleted()))
+            {
+                checkSubscriptionsNotAheadOfDelivery(entry);
 
-            deliverAsync();
-        }
+                deliverAsync();
+            }
 
-        if(_managedObject != null)
-        {
-            _managedObject.checkForNotification(entry.getMessage());
-        }
+            if(_managedObject != null)
+            {
+                _managedObject.checkForNotification(entry.getMessage());
+            }
 
-        if(action != null)
-        {
-            action.onEnqueue(entry);
+            if(action != null)
+            {
+                action.onEnqueue(entry);
+            }
         }
-
     }
 
     private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -720,20 +730,20 @@ public class SimpleAMQQueue implements A
     {
         getAtomicQueueCount().incrementAndGet();
     }
-    
+
     private void incrementTxnEnqueueStats(final ServerMessage message)
     {
         SessionConfig session = message.getSessionConfig();
-        
+
         if(session !=null && session.isTransactional())
         {
             _msgTxnEnqueues.incrementAndGet();
             _byteTxnEnqueues.addAndGet(message.getSize());
         }
     }
-    
+
     private void incrementTxnDequeueStats(QueueEntry entry)
-    {      
+    {
         _msgTxnDequeues.incrementAndGet();
         _byteTxnDequeues.addAndGet(entry.getSize());
     }
@@ -747,6 +757,40 @@ public class SimpleAMQQueue implements A
         incrementUnackedMsgCount();
 
         sub.send(entry);
+
+        if(_isTopic)
+        {
+            if(allSubscriptionsAhead(entry) && entry.acquire())
+            {
+                entry.discard();
+            }
+        }
+    }
+
+    private boolean allSubscriptionsAhead(final QueueEntry entry)
+    {
+        SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+        while(subIter.advance() && !entry.isAcquired())
+        {
+            final Subscription subscription = subIter.getNode().getSubscription();
+            if(!subscription.isClosed())
+            {
+                QueueContext context = (QueueContext) subscription.getQueueContext();
+                if(context != null)
+                {
+                    QueueEntry subnode = context._lastSeenEntry;
+                    if(subnode.compareTo(entry)<0)
+                    {
+                        return false;
+                    }
+                }
+                else
+                {
+                    return false;
+                }
+            }
+        }
+        return true;
     }
 
     private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
@@ -831,7 +875,7 @@ public class SimpleAMQQueue implements A
         {
             _deliveredMessages.decrementAndGet();
         }
-        
+
         if(sub != null && sub.isSessionTransactional())
         {
             incrementTxnDequeueStats(entry);
@@ -888,7 +932,7 @@ public class SimpleAMQQueue implements A
     {
         return _subscriptionList.size();
     }
-    
+
     public int getConsumerCountHigh()
     {
         return _counsumerCountHigh.get();
@@ -1298,7 +1342,7 @@ public class SimpleAMQQueue implements A
     }
 
     public long clearQueue() throws AMQException
-    {         
+    {
         return clear(0l);
     }
 
@@ -1309,7 +1353,7 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied: queue " + getName());
         }
-        
+
         QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
@@ -1376,7 +1420,7 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied: " + getName());
         }
-        
+
         if (!_deleted.getAndSet(true))
         {
 
@@ -1670,12 +1714,9 @@ public class SimpleAMQQueue implements A
             {
                 sub.getSendLock();
                 atTail = attemptDelivery(sub);
-                if (atTail && sub.isAutoClose())
+                if (atTail && getNextAvailableEntry(sub) == null)
                 {
-                    unregisterSubscription(sub);
-
-                    sub.confirmAutoClose();
-
+                    sub.queueEmpty();
                 }
                 else if (!atTail)
                 {
@@ -1696,6 +1737,7 @@ public class SimpleAMQQueue implements A
         {
             advanceAllSubscriptions();
         }
+
         return atTail;
     }
 
@@ -1854,13 +1896,12 @@ public class SimpleAMQQueue implements A
                     {
                         if (extraLoops == 0)
                         {
-                            deliveryIncomplete = false;
-                            if (sub.isAutoClose())
+                            if(getNextAvailableEntry(sub) == null)
                             {
-                                unregisterSubscription(sub);
-
-                                sub.confirmAutoClose();
+                                sub.queueEmpty();
                             }
+                            deliveryIncomplete = false;
+
                         }
                         else
                         {
@@ -2166,22 +2207,22 @@ public class SimpleAMQQueue implements A
     {
         return _dequeueSize.get();
     }
-    
+
     public long getByteTxnEnqueues()
     {
         return _byteTxnEnqueues.get();
     }
-    
+
     public long getByteTxnDequeues()
     {
         return _byteTxnDequeues.get();
     }
-    
+
     public long getMsgTxnEnqueues()
     {
         return _msgTxnEnqueues.get();
     }
-    
+
     public long getMsgTxnDequeues()
     {
         return _msgTxnDequeues.get();
@@ -2218,21 +2259,21 @@ public class SimpleAMQQueue implements A
     {
         return _unackedMsgCountHigh.get();
     }
-    
+
     public long getUnackedMessageCount()
     {
         return _unackedMsgCount.get();
     }
-    
+
     public void decrementUnackedMsgCount()
     {
         _unackedMsgCount.decrementAndGet();
     }
-    
+
     private void incrementUnackedMsgCount()
     {
         long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-        
+
         long unackedMsgCountHigh;
         while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Sun Aug 14 20:24:57 2011
@@ -23,14 +23,17 @@ package org.apache.qpid.server.security.
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 
-public interface AuthenticationManager extends Closeable
+public interface AuthenticationManager extends Closeable, CallbackHanderSource
 {
     String getMechanisms();
 
     SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
 
     AuthenticationResult authenticate(SaslServer server, byte[] response);
+
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Sun Aug 14 20:24:57 2011
@@ -234,4 +234,9 @@ public class PrincipalDatabaseAuthentica
     {
         Security.removeProvider(PROVIDER_NAME);
     }
+
+    public CallbackHandler getHandler(String mechanism)
+    {
+        return _callbackHandlerMap.get(mechanism);
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java Sun Aug 14 20:24:57 2011
@@ -45,9 +45,9 @@ public class AmqPlainSaslServerFactory i
 
     public String[] getMechanismNames(Map props)
     {
-        if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE))
+            props.containsKey(Sasl.POLICY_NOACTIVE)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Sun Aug 14 20:24:57 2011
@@ -47,10 +47,10 @@ public class AnonymousSaslServerFactory 
 
     public String[] getMechanismNames(Map props)
     {
-        if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
             props.containsKey(Sasl.POLICY_NOACTIVE) ||
-            props.containsKey(Sasl.POLICY_NOANONYMOUS))
+            props.containsKey(Sasl.POLICY_NOANONYMOUS)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java Sun Aug 14 20:24:57 2011
@@ -45,9 +45,9 @@ public class PlainSaslServerFactory impl
 
     public String[] getMechanismNames(Map props)
     {
-        if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE))
+            props.containsKey(Sasl.POLICY_NOACTIVE)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Sun Aug 14 20:24:57 2011
@@ -106,7 +106,7 @@ public class ServerSession extends Sessi
             new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
 
     private ServerTransaction _transaction;
-    
+
     private final AtomicLong _txnStarts = new AtomicLong(0);
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -138,7 +138,7 @@ public class ServerSession extends Sessi
     public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
     {
         super(connection, delegate, name, expiry);
-        _connectionConfig = connConfig;        
+        _connectionConfig = connConfig;
         _transaction = new AutoCommitTransaction(this.getMessageStore());
         _principal = new UserPrincipal(connection.getAuthorizationID());
         _reference = new WeakReference(this);
@@ -331,7 +331,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    public void removeDispositionListener(Method method)                               
+    public void removeDispositionListener(Method method)
     {
         _messageDispositionListenerMap.remove(method.getId());
     }
@@ -351,7 +351,7 @@ public class ServerSession extends Sessi
         {
             task.doTask(this);
         }
-        
+
         CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE());
     }
 
@@ -396,7 +396,7 @@ public class ServerSession extends Sessi
 
     public void unregister(Subscription_0_10 sub)
     {
-        _subscriptions.remove(sub.getConsumerTag().toString());
+        _subscriptions.remove(sub.getName());
         try
         {
             sub.getSendLock();
@@ -417,7 +417,7 @@ public class ServerSession extends Sessi
             sub.releaseSendLock();
         }
     }
-    
+
     public boolean isTransactional()
     {
         // this does not look great but there should only be one "non-transactional"
@@ -435,7 +435,7 @@ public class ServerSession extends Sessi
     public void commit()
     {
         _transaction.commit();
-        
+
         _txnCommits.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
@@ -444,13 +444,13 @@ public class ServerSession extends Sessi
     public void rollback()
     {
         _transaction.rollback();
-        
+
         _txnRejects.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
     }
 
-    
+
     private void incrementOutstandingTxnsIfNecessary()
     {
         if(isTransactional())
@@ -460,7 +460,7 @@ public class ServerSession extends Sessi
             _txnCount.compareAndSet(0,1);
         }
     }
-    
+
     private void decrementOutstandingTxnsIfNecessary()
     {
         if(isTransactional())
@@ -490,7 +490,7 @@ public class ServerSession extends Sessi
     {
         return _txnCount.get();
     }
-    
+
     public Principal getPrincipal()
     {
         return _principal;
@@ -606,7 +606,6 @@ public class ServerSession extends Sessi
         return (LogSubject) this;
     }
 
-    @Override
     public String toLogString()
     {
        return "[" +

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Sun Aug 14 20:24:57 2011
@@ -20,12 +20,7 @@
  */
 package org.apache.qpid.server.virtualhost;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -36,7 +31,6 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -63,6 +57,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -115,6 +110,7 @@ public class VirtualHostImpl implements 
     private final long _createTime = System.currentTimeMillis();
     private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
     private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
+    private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
 
     public IConnectionRegistry getConnectionRegistry()
     {
@@ -663,6 +659,17 @@ public class VirtualHostImpl implements 
         }
     }
 
+    public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
+    {
+        LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
+        if(linkRegistry == null)
+        {
+            linkRegistry = new LinkRegistry();
+            _linkRegistry.put(remoteContainerId, linkRegistry);
+        }
+        return linkRegistry;
+    }
+
     public ConfigStore getConfigStore()
     {
         return getApplicationRegistry().getConfigStore();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sun Aug 14 20:24:57 2011
@@ -100,14 +100,14 @@ public class AbstractHeadersExchangeTest
 
         return bind(queueName, queueName, getHeadersMap(bindings));
     }
-    
+
     protected void unbind(TestQueue queue, String... bindings) throws AMQException
     {
         String queueName = queue.getName();
         //TODO - check this
         exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings)));
     }
-    
+
     protected int getCount()
     {
         return count;
@@ -120,7 +120,7 @@ public class AbstractHeadersExchangeTest
         exchange.onBind(new Binding(null,key, queue, exchange, args));
         return queue;
     }
-    
+
 
     protected int route(Message m) throws AMQException
     {
@@ -175,14 +175,14 @@ public class AbstractHeadersExchangeTest
             }
 
     }
-    
+
     static Map<String,Object> getHeadersMap(String... entries)
     {
         if(entries == null)
         {
             return null;
         }
-        
+
         Map<String,Object> headers = new HashMap<String,Object>();
 
         for (String s : entries)
@@ -438,7 +438,7 @@ public class AbstractHeadersExchangeTest
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void requeue(Subscription subscription) 
+                public void requeue(Subscription subscription)
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1157654&r1=1157653&r2=1157654&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Sun Aug 14 20:24:57 2011
@@ -31,10 +31,10 @@ import org.apache.qpid.server.message.Se
 /**
  * Mock Server Message allowing its persistent flag to be controlled from test.
  */
-class MockServerMessage implements ServerMessage
+class MockServerMessage implements ServerMessage<MockServerMessage>
 {
     /**
-     * 
+     *
      */
     private final boolean persistent;
 
@@ -46,67 +46,67 @@ class MockServerMessage implements Serve
         this.persistent = persistent;
     }
 
-    @Override
+
     public boolean isPersistent()
     {
         return persistent;
     }
 
-    @Override
-    public MessageReference newReference()
+
+    public MessageReference<MockServerMessage> newReference()
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public boolean isImmediate()
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public long getSize()
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public SessionConfig getSessionConfig()
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public String getRoutingKey()
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public AMQMessageHeader getMessageHeader()
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public long getExpiration()
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public int getContent(ByteBuffer buf, int offset)
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public long getArrivalTime()
     {
         throw new NotImplementedException();
     }
 
-    @Override
+
     public Long getMessageNumber()
     {
         return 0L;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org