You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/17 14:17:11 UTC

svn commit: r900135 - in /qpid/branches/0.5.x-dev/qpid/java: broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/subscription/ broker/src/test/java/or...

Author: rgodfrey
Date: Sun Jan 17 13:17:10 2010
New Revision: 900135

URL: http://svn.apache.org/viewvc?rev=900135&view=rev
Log:
QPID-2321 : Updated conflation queue to use same argument names as C++ for LVQ; added support for "browse only" consumers

Added:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
Modified:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Sun Jan 17 13:17:10 2010
@@ -68,11 +68,6 @@
         return _config.getInt("priorities", -1);
     }
 
-    public String getConflationKey()
-    {
-        return _config.getString("conflationKey", null);
-    }
-
     public String getExchange()
     {
         return _config.getString("exchange", null);
@@ -123,4 +118,14 @@
         return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
     }
 
+    public boolean isLVQ()
+    {
+        return _config.getBoolean("lvq", false);
+    }
+
+    public String getLVQKey()
+    {
+        return _config.getString("lvqKey", null);
+    }
+
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sun Jan 17 13:17:10 2010
@@ -52,19 +52,27 @@
         while(subIter.advance() && !entry.isAcquired())
         {
             final Subscription subscription = subIter.getNode().getSubscription();
-            QueueEntry subnode = subscription.getLastSeenEntry();
-            while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired())
+            if(!subscription.isClosed())
             {
-                if(subscription.setLastSeenEntry(subnode,entry))
+                QueueContext context = (QueueContext) subscription.getQueueContext();
+                if(context != null)
                 {
-                    break;
-                }
-                else
-                {
-                    subnode = subscription.getLastSeenEntry();
+                    QueueEntry subnode = context._lastSeenEntry;
+                    QueueEntry released = context._releasedEntry;
+                    while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+                    {
+                        if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
+                        {
+                            break;
+                        }
+                        else
+                        {
+                            subnode = context._lastSeenEntry;
+                            released = context._releasedEntry;
+                        }
+                    }
                 }
             }
-
         }
     }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun Jan 17 13:17:10 2010
@@ -40,6 +40,10 @@
 
 public interface AMQQueue extends Managable, Comparable<AMQQueue>
 {
+    public interface Context
+    {
+        QueueEntry getLastSeenEntry();
+    }    
 
     AMQShortString getName();
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Jan 17 13:17:10 2010
@@ -33,7 +33,9 @@
 public class AMQQueueFactory
 {
     public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
-    public static final AMQShortString X_QPID_CONFLATION_KEY = new AMQShortString ("x-qpid-conflation-key");
+    private static final AMQShortString QPID_LAST_VALUE_QUEUE = new AMQShortString ("qpid.last_value_queue");
+    private static final AMQShortString QPID_LAST_VALUE_QUEUE_KEY = new AMQShortString("qpid.last_value_queue_key");
+    private static final String QPID_LVQ_KEY = "qpid.LVQ_key";
 
     private abstract static class QueueProperty
     {
@@ -134,7 +136,17 @@
             throws AMQException
     {
         final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
-        final String conflationKey = arguments == null ? null : arguments.containsKey(X_QPID_CONFLATION_KEY) ? arguments.getString(X_QPID_CONFLATION_KEY) : null;
+        String conflationKey = null;
+
+        if(arguments != null && (arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)))
+        {
+            conflationKey = arguments.getString(QPID_LAST_VALUE_QUEUE_KEY);
+            if(conflationKey == null)
+            {
+                conflationKey = QPID_LVQ_KEY;
+            }
+        }
+
 
         AMQQueue q = null;
         if(conflationKey != null)
@@ -190,14 +202,15 @@
             }
             arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
         }
-        String conflationKey = config.getConflationKey();
-        if(conflationKey != null)
+        if(config.isLVQ() || config.getLVQKey() != null)
         {
             if(arguments == null)
             {
                 arguments = new FieldTable();
             }
-            arguments.put(new AMQShortString("x-qpid-conflation-key"), conflationKey);
+            arguments.setInteger(QPID_LAST_VALUE_QUEUE, 1);
+            arguments.setString(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+
         }
 
         AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments);

Added: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=900135&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java Sun Jan 17 13:17:10 2010
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+final class QueueContext implements AMQQueue.Context
+{
+    volatile QueueEntry _lastSeenEntry;
+    volatile QueueEntry _releasedEntry;
+
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+            _lastSeenUpdater =
+        AtomicReferenceFieldUpdater.newUpdater
+        (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+            _releasedUpdater =
+        AtomicReferenceFieldUpdater.newUpdater
+        (QueueContext.class, QueueEntry.class, "_releasedEntry");
+
+    public QueueContext(QueueEntry head)
+    {
+        _lastSeenEntry = head;
+    }
+
+    public QueueEntry getLastSeenEntry()
+    {
+        return _lastSeenEntry;
+    }
+}

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Jan 17 13:17:10 2010
@@ -295,7 +295,7 @@
 
         _activeSubscriberCount.incrementAndGet();
         subscription.setStateListener(this);
-        subscription.setLastSeenEntry(null, _entries.getHead());
+        subscription.setQueueContext(new QueueContext(_entries.getHead()));
 
         if (!isDeleted())
         {
@@ -330,12 +330,7 @@
             // No longer can the queue have an exclusive consumer
             setExclusiveSubscriber(null);
 
-            QueueEntry lastSeen;
-
-            while ((lastSeen = subscription.getLastSeenEntry()) != null)
-            {
-                subscription.setLastSeenEntry(lastSeen, null);
-            }
+            subscription.setQueueContext(null);
 
             // auto-delete queues must be deleted if there are no remaining subscribers
 
@@ -520,68 +515,42 @@
             _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
         }
         sub.send(entry);
+
+        setLastSeenEntry(sub,entry);
     }
 
-    private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
+    private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)  throws AMQException
     {
+        return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
+    }
 
-        // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
-        // interest in.
-        QueueEntry node = sub.getLastSeenEntry();
-        while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
-        {
-
-            QueueEntry newNode = _entries.next(node);
-            if (newNode != null)
-            {
-                sub.setLastSeenEntry(node, newNode);
-                node = sub.getLastSeenEntry();
-            }
-            else
-            {
-                node = null;
-                break;
-            }
-
-        }
+    private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+    {
+        QueueContext subContext = (QueueContext) sub.getQueueContext();
+        QueueEntry releasedEntry = subContext._releasedEntry;
 
-        if (node == entry)
+        QueueContext._lastSeenUpdater.set(subContext, entry);
+        if(releasedEntry == entry)
         {
-            // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
-            // good
-            return true;
+           QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
         }
-        else
-        {
-            // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
-            // no-one else has updated it to something furhter on in the list
-            //TODO - check
-            //updateLastSeenEntry(sub, entry);
-            return false;
-        }
-
     }
 
-    private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
+    private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
     {
-        QueueEntry node = sub.getLastSeenEntry();
-
-        if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+        QueueContext subContext = (QueueContext) sub.getQueueContext();
+        if(subContext != null)
         {
-            do
+            QueueEntry oldEntry;
+
+            while((oldEntry  = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0)
             {
-                if (sub.setLastSeenEntry(node, entry))
-                {
-                    return;
-                }
-                else
+                if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
                 {
-                    node = sub.getLastSeenEntry();
+                    break;
                 }
             }
-            while (node != null && entry.compareTo(node) < 0);
         }
-
     }
 
     public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
@@ -596,7 +565,7 @@
             // we don't make browsers send the same stuff twice
             if (!sub.isBrowser())
             {
-                updateLastSeenEntry(sub, entry);
+                updateSubRequeueEntry(sub, entry);
             }
         }
 
@@ -1408,16 +1377,12 @@
     private boolean attemptDelivery(Subscription sub) throws AMQException
     {
         boolean atTail = false;
-        boolean advanced = false;
         boolean subActive = sub.isActive() && !sub.isSuspended();
         if (subActive)
         {
-            QueueEntry node = moveSubscriptionToNextNode(sub);
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity());
-            }
-            if (!(node.isAcquired() || node.isDeleted()))
+            QueueEntry node  = getNextAvailableEntry(sub);
+
+            if (node != null && !(node.isAcquired() || node.isDeleted()))
             {
                 if (sub.hasInterest(node))
                 {
@@ -1430,19 +1395,6 @@
                         else
                         {
                             deliverMessage(sub, node);
-
-                            if (sub.isBrowser())
-                            {
-                                QueueEntry newNode = _entries.next(node);
-
-                                if (newNode != null)
-                                {
-                                    advanced = true;
-                                    sub.setLastSeenEntry(node, newNode);
-                                    node = sub.getLastSeenEntry();
-                                }
-                                
-                            }
                         }
 
                     }
@@ -1462,17 +1414,8 @@
                         node.addStateChangeListener(new QueueEntryListener(sub, node));
                     }
                 }
-                else
-                {
-                    // this subscription is not interested in this node so we can skip over it
-                    QueueEntry newNode = _entries.next(node);
-                    if (newNode != null)
-                    {
-                        sub.setLastSeenEntry(node, newNode);
-                    }
-                }
             }
-            atTail = (_entries.next(node) == null) && !advanced;
+            atTail = (node == null) || (_entries.next(node) == null);
         }
         return atTail || !subActive;
     }
@@ -1484,44 +1427,55 @@
         {
             SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
             Subscription sub = subNode.getSubscription();
-            moveSubscriptionToNextNode(sub);
+            if(!sub.isBrowser())
+            {
+                getNextAvailableEntry(sub);
+            }
+            else
+            {
+                // TODO
+            }
         }
     }
 
-    private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
+    private QueueEntry getNextAvailableEntry(final Subscription sub)
             throws AMQException
     {
-        QueueEntry node = sub.getLastSeenEntry();
-
-        while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
+        QueueContext context = (QueueContext) sub.getQueueContext();
+        if(context != null)
         {
-            if (!node.isAcquired() && !node.isDeleted() && node.expired())
+            QueueEntry lastSeen = context._lastSeenEntry;
+            QueueEntry releasedNode = context._releasedEntry;
+
+            QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+
+            boolean expired = false;
+            while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
             {
-                if (node.acquire())
+                if (expired)
                 {
-                    final StoreContext reapingStoreContext = new StoreContext();
-                    node.discard(reapingStoreContext);
+                    expired = false;
+                    if (node.acquire())
+                    {
+                        node.discard(new StoreContext());
+                    }
                 }
-            }
-            QueueEntry newNode = _entries.next(node);
-            if (newNode != null)
-            {
-                sub.setLastSeenEntry(node, newNode);
-                node = sub.getLastSeenEntry();
-            }
-            else
-            {
-                break;
-            }
 
-        }
+                if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
+                {
+                    QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
+                }
 
-        if (_logger.isDebugEnabled())
+                lastSeen = context._lastSeenEntry;
+                releasedNode = context._releasedEntry;
+                node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
+            }
+            return node;
+        }
+        else
         {
-            _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity()));
+            return null;
         }
-
-        return node;
     }
 
     private void processQueue(Runnable runner) throws AMQException

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sun Jan 17 13:17:10 2010
@@ -87,9 +87,9 @@
 
     public State getState();
 
-    QueueEntry getLastSeenEntry();
+    AMQQueue.Context getQueueContext();
 
-    boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
+    void setQueueContext(AMQQueue.Context queueContext);
 
 
     boolean isActive();

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sun Jan 17 13:17:10 2010
@@ -65,7 +65,7 @@
 
 
     private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
-    private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
+    private AMQQueue.Context _queueContext;
     private final ClientDeliveryMethod _deliveryMethod;
     private final RecordDeliveryMethod _recordMethod;
 
@@ -627,23 +627,14 @@
         _stateListener = listener;
     }
 
-
-    public QueueEntry getLastSeenEntry()
+    public AMQQueue.Context getQueueContext()
     {
-        QueueEntry entry = _queueContext.get();
-
-        if(_logger.isDebugEnabled())
-        {
-            _logger.debug(_logActor + ": lastSeenEntry: " + (entry == null ? "null" : entry.debugIdentity()));
-        }        
-
-        return entry;
+        return _queueContext;
     }
 
-    public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
+    public void setQueueContext(AMQQueue.Context context)
     {
-        _logger.debug(debugIdentity() + " Setting Last Seen To:" + (newvalue == null ? "nullNV" : newvalue.debugIdentity()));
-        return _queueContext.compareAndSet(expected,newvalue);
+        _queueContext = context;
     }
 
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Sun Jan 17 13:17:10 2010
@@ -186,7 +186,7 @@
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(null, messageA);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
         
         // Check removing the subscription removes it's information from the queue
         _queue.unregisterSubscription(_subscription);
@@ -197,8 +197,8 @@
         
         AMQMessage messageB = createMessage(new Long (25));
         _queue.enqueue(null, messageB);
-        QueueEntry entry = _subscription.getLastSeenEntry();
-        assertNull(entry);
+
+        assertNull(_subscription.getQueueContext());
     }
     
     public void testQueueNoSubscriber() throws AMQException, InterruptedException
@@ -207,7 +207,7 @@
         _queue.enqueue(null, messageA);
         _queue.registerSubscription(_subscription, false);
         Thread.sleep(150);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
     }
 
     public void testExclusiveConsumer() throws AMQException
@@ -224,7 +224,7 @@
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(null, messageA);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
         
         // Check we cannot add a second subscriber to the queue
         Subscription subB = new MockSubscription();
@@ -273,7 +273,7 @@
         Long id = new Long(26);
         AMQMessage message = createMessage(id);
         _queue.enqueue(null, message);
-        QueueEntry entry = _subscription.getLastSeenEntry();
+        QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
         entry.setRedelivered(true);
         _queue.resend(entry, _subscription);
         

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Sun Jan 17 13:17:10 2010
@@ -42,7 +42,7 @@
     private AMQShortString tag = new AMQShortString("mocktag");
     private AMQQueue queue = null;
     private StateListener _listener = null;
-    private QueueEntry lastSeen = null;
+    private AMQQueue.Context _queueContext = null;
     private State _state = State.ACTIVE;
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();
@@ -81,11 +81,6 @@
         return _subscriptionID;
     }
 
-    public QueueEntry getLastSeenEntry()
-    {
-        return lastSeen;
-    }
-
     public SubscriptionAcquiredState getOwningState()
     {
         return new QueueEntry.SubscriptionAcquiredState(this);
@@ -155,21 +150,9 @@
 
     public void send(QueueEntry msg) throws AMQException
     {
-        lastSeen = msg;
         messages.add(msg);
     }
 
-    public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
-    {
-        boolean result = false;
-        if (expected != null)
-        {
-            result = (expected.equals(lastSeen));
-        }
-        lastSeen = newValue;
-        return result;
-    }
-
     public void setQueue(AMQQueue queue, boolean exclusive)
     {
         this.queue = queue;
@@ -185,6 +168,16 @@
         return _state;
     }
 
+    public AMQQueue.Context getQueueContext()
+    {
+        return _queueContext;
+    }
+
+    public void setQueueContext(AMQQueue.Context queueContext)
+    {
+        _queueContext = queueContext;
+    }
+
     public boolean wouldSuspend(QueueEntry msg)
     {
         return false;

Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Sun Jan 17 13:17:10 2010
@@ -47,6 +47,8 @@
 
     protected final boolean _isAutoDelete;
 
+    private final boolean _browseOnly;
+
     private AMQShortString _queueName;
 
     private AMQShortString _routingKey;
@@ -82,6 +84,7 @@
         _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
         _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
         _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+        _browseOnly = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_BROWSE));
         _queueName = binding.getQueueName() == null ? null : binding.getQueueName();
         _routingKey = binding.getRoutingKey() == null ? null : binding.getRoutingKey();
         _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
@@ -122,6 +125,12 @@
     protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
                              boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
     {
+        this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,bindingKeys, false);
+    }
+
+    protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+                             boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys, boolean browseOnly)
+    {
         if ( (ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(exchangeClass) || 
               ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exchangeClass))  
               && routingKey == null)
@@ -144,6 +153,7 @@
         _queueName = queueName;
         _isDurable = isDurable;
         _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
+        _browseOnly = browseOnly;
     }
 
     public AMQShortString getEncodedName()
@@ -502,4 +512,9 @@
             return new AMQAnyDestination(binding);
         }
     }
+
+    public boolean isBrowseOnly()
+    {
+        return _browseOnly;
+    }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Jan 17 13:17:10 2010
@@ -902,7 +902,7 @@
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
-                                  false, false);
+                                  ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public C createExclusiveConsumer(Destination destination) throws JMSException
@@ -910,7 +910,7 @@
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
-                                  false, false);
+                                  ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -918,7 +918,7 @@
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
-                                  messageSelector, null, false, false);
+                                  messageSelector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -927,7 +927,7 @@
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
-                                  messageSelector, null, false, false);
+                                  messageSelector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -944,7 +944,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -952,7 +952,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -960,7 +960,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -968,7 +968,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()),
                                   false);
     }
 

Modified: qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java Sun Jan 17 13:17:10 2010
@@ -31,6 +31,7 @@
     public static final String OPTION_EXCLUSIVE = "exclusive";
     public static final String OPTION_AUTODELETE = "autodelete";
     public static final String OPTION_DURABLE = "durable";
+    public static final String OPTION_BROWSE = "browse";
     public static final String OPTION_CLIENTID = "clientid";
     public static final String OPTION_SUBSCRIPTION = "subscription";
     public static final String OPTION_ROUTING_KEY = "routingkey";

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java Sun Jan 17 13:17:10 2010
@@ -27,6 +27,7 @@
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.framing.AMQShortString;
 
 import javax.naming.Context;
@@ -43,6 +44,7 @@
 import java.util.Hashtable;
 import java.util.Map;
 import java.util.HashMap;
+import java.net.URISyntaxException;
 
 import junit.framework.TestCase;
 
@@ -118,7 +120,7 @@
 
 
         final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-conflation-key","key");
+        arguments.put("qpid.last_value_queue_key","key");
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
         queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
@@ -163,7 +165,7 @@
 
 
         final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-conflation-key","key");
+        arguments.put("qpid.last_value_queue_key","key");
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
         queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
@@ -239,7 +241,7 @@
 
 
         final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-conflation-key","key");
+        arguments.put("qpid.last_value_queue_key","key");
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
         queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
@@ -310,6 +312,67 @@
 
     }
 
+    public void testConflationBrowser() throws JMSException, NamingException, AMQException, InterruptedException, URISyntaxException
+    {
+        consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("qpid.last_value_queue_key","key");
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+        queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        producer = producerSession.createProducer(queue);
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            producer.send(nextMessage(msg, false, producerSession, producer));
+            if(msg%10000 == 0)
+            {
+                System.err.println("Sent... " + msg);
+                Thread.sleep(1000);
+            }
+
+        }
+
+        ((AMQSession)producerSession).sync();
+
+        //
+        AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'");
+        AMQQueue browseQueue = new AMQQueue(url);
+
+        consumer = consumerSession.createConsumer(browseQueue);
+        consumerConnection.start();
+        Message received;
+        int receivedCount = 0;
+        while((received = consumer.receive(1000))!=null)
+        {
+            receivedCount++;
+            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+        }
+
+        System.err.println("Received Count: " + receivedCount);
+
+        producer.send(nextMessage(MSG_COUNT, false, producerSession, producer));
+
+        ((AMQSession)producerSession).sync();
+
+        while((received = consumer.receive(5000))!=null)
+        {
+            receivedCount++;
+            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+        }
+
+        System.err.println("Received Count: " + receivedCount);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+
+
+    }
 
 
 

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Sun Jan 17 13:17:10 2010
@@ -117,6 +117,16 @@
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public AMQQueue.Context getQueueContext()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setQueueContext(AMQQueue.Context queueContext)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public QueueEntry getLastSeenEntry()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org