You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/04 21:30:32 UTC

svn commit: r1564470 - in /qpid/branches/java-broker-amqp-1-0-management/java: ./ broker-core/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/...

Author: rgodfrey
Date: Tue Feb  4 20:30:32 2014
New Revision: 1564470

URL: http://svn.apache.org/r1564470
Log:
flattened AbstractSubscription down into QueueSubscription, removed the setting of context from the subscription public api

Removed:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
Modified:
    qpid/branches/java-broker-amqp-1-0-management/java/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
    qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml

Propchange: qpid/branches/java-broker-amqp-1-0-management/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1563432-1564446

Propchange: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core:r1562456-1564446

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Feb  4 20:30:32 2014
@@ -727,4 +727,6 @@ public abstract class AbstractExchange i
 
     }
 
+
+
 }

Propchange: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue:r1562456-1564446

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Feb  4 20:30:32 2014
@@ -70,11 +70,6 @@ public interface AMQQueue extends Compar
 
     long getTotalEnqueueCount();
 
-    public interface Context
-    {
-        QueueEntry getLastSeenEntry();
-    }
-
     void setNoLocal(boolean b);
 
     boolean isAutoDelete();
@@ -92,8 +87,6 @@ public interface AMQQueue extends Compar
                                       final Class<? extends ServerMessage> messageClass,
                                       final String consumerName, EnumSet<Subscription.Option> options) throws AMQException;
 
-    void unregisterSubscription(final Subscription subscription) throws AMQException;
-
     Collection<Subscription> getConsumers();
 
     interface SubscriptionRegistrationListener

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Tue Feb  4 20:30:32 2014
@@ -44,10 +44,10 @@ public abstract class OutOfOrderQueue ex
         SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator();
         while(subIter.advance() && !entry.isAcquired())
         {
-            final Subscription subscription = subIter.getNode().getSubscription();
+            final QueueSubscription subscription = subIter.getNode().getSubscription();
             if(!subscription.isClosed())
             {
-                QueueContext context = (QueueContext) subscription.getQueueContext();
+                QueueContext context = subscription.getQueueContext();
                 if(context != null)
                 {
                     QueueEntry released = context.getReleasedEntry();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Tue Feb  4 20:30:32 2014
@@ -23,7 +23,7 @@ package org.apache.qpid.server.queue;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-final class QueueContext implements AMQQueue.Context
+final class QueueContext
 {
     private volatile QueueEntry _lastSeenEntry;
     private volatile QueueEntry _releasedEntry;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java Tue Feb  4 20:30:32 2014
@@ -30,32 +30,43 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.subscription.AbstractSubscription;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionTarget;
 import org.apache.qpid.server.util.StateChangeListener;
 
 import java.text.MessageFormat;
 import java.util.EnumMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
 
-class QueueSubscription<T extends SubscriptionTarget> extends AbstractSubscription
+class QueueSubscription<T extends SubscriptionTarget> implements Subscription
 {
     private static final Logger _logger = Logger.getLogger(QueueSubscription.class);
     private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
     private final AtomicBoolean _closed = new AtomicBoolean(false);
+    private final long _subscriptionID;
+    private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
+    private final Lock _stateChangeLock = new ReentrantLock();
+    private final long _createTime = System.currentTimeMillis();
+    private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+    private final boolean _acquires;
+    private final boolean _seesRequeues;
+    private final String _consumerName;
+    private final boolean _isTransient;
+    private final AtomicLong _deliveredCount = new AtomicLong(0);
+    private final AtomicLong _deliveredBytes = new AtomicLong(0);
+    private final FilterManager _filters;
+    private final Class<? extends ServerMessage> _messageClass;
+    private final Object _sessionReference;
     private SimpleAMQQueue _queue;
-    private String _traceExclude;
-    private String _trace;
     private GenericActor _logActor;
-    private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
-
 
     static final EnumMap<SubscriptionTarget.State, State> STATE_MAP =
             new EnumMap<SubscriptionTarget.State, State>(SubscriptionTarget.State.class);
@@ -69,6 +80,15 @@ class QueueSubscription<T extends Subscr
 
     private final T _target;
     private final SubFlushRunner _runner = new SubFlushRunner(this);
+    private volatile QueueContext _queueContext;
+    private StateChangeListener<? extends Subscription, State> _stateListener = new StateChangeListener<Subscription, State>()
+    {
+        public void stateChanged(Subscription sub, State oldState, State newState)
+        {
+            CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+        }
+    };
+    private boolean _noLocal;
 
     QueueSubscription(final FilterManager filters,
                       final Class<? extends ServerMessage> messageClass,
@@ -78,8 +98,14 @@ class QueueSubscription<T extends Subscr
                       final boolean isTransient,
                       T target)
     {
-        super(filters, messageClass, target.getSessionModel().getConnectionReference(),
-              acquires, seesRequeues, consumerName, isTransient);
+        _messageClass = messageClass;
+        _sessionReference = target.getSessionModel().getConnectionReference();
+        _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
+        _filters = filters;
+        _acquires = acquires;
+        _seesRequeues = seesRequeues;
+        _consumerName = consumerName;
+        _isTransient = isTransient;
         _target = target;
         _target.setStateListener(
                 new StateChangeListener<SubscriptionTarget, SubscriptionTarget.State>()
@@ -187,12 +213,6 @@ class QueueSubscription<T extends Subscr
     }
 
     @Override
-    protected void doSend(final QueueEntry entry, final boolean batch) throws AMQException
-    {
-        _target.send(entry, batch);
-    }
-
-    @Override
     public void flushBatched()
     {
         _target.flushBatched();
@@ -241,9 +261,6 @@ class QueueSubscription<T extends Subscr
         }
         _queue = queue;
 
-        _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
-        _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
-
         String queueString = new QueueLogSubject(_queue).toLogString();
 
         _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID())
@@ -263,18 +280,6 @@ class QueueSubscription<T extends Subscr
         }
     }
 
-
-    protected final String getTraceExclude()
-    {
-        return _traceExclude;
-    }
-
-    protected final String getTrace()
-    {
-        return _trace;
-    }
-
-
     protected final LogSubject getLogSubject()
     {
         return _logActor.getLogSubject();
@@ -303,4 +308,166 @@ class QueueSubscription<T extends Subscr
         return _runner;
     }
 
+    public final long getSubscriptionID()
+    {
+        return _subscriptionID;
+    }
+
+    public final StateChangeListener<? extends Subscription, State> getStateListener()
+    {
+        return _stateListener;
+    }
+
+    public final void setStateListener(StateChangeListener<? extends Subscription, State> listener)
+    {
+        _stateListener = listener;
+    }
+
+    final QueueContext getQueueContext()
+    {
+        return _queueContext;
+    }
+
+    final void setQueueContext(QueueContext queueContext)
+    {
+        _queueContext = queueContext;
+    }
+
+    protected boolean updateState(State from, State to)
+    {
+        return _state.compareAndSet(from, to);
+    }
+
+    public final boolean isActive()
+    {
+        return getState() == State.ACTIVE;
+    }
+
+    public final boolean isClosed()
+    {
+        return getState() == State.CLOSED;
+    }
+
+    public final void setNoLocal(boolean noLocal)
+    {
+        _noLocal = noLocal;
+    }
+
+    public final boolean hasInterest(QueueEntry entry)
+    {
+       //check that the message hasn't been rejected
+        if (entry.isRejectedBy(getSubscriptionID()))
+        {
+
+            return false;
+        }
+
+        if (entry.getMessage().getClass() == _messageClass)
+        {
+            if(_noLocal)
+            {
+                Object connectionRef = entry.getMessage().getConnectionReference();
+                if (connectionRef != null && connectionRef == _sessionReference)
+                {
+                    return false;
+                }
+            }
+        }
+        else
+        {
+            // no interest in messages we can't convert
+            if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(),
+                                                                              _messageClass)==null)
+            {
+                return false;
+            }
+        }
+        return (_filters == null) || _filters.allAllow(entry.asFilterable());
+    }
+
+    protected String getFilterLogString()
+    {
+        StringBuilder filterLogString = new StringBuilder();
+        String delimiter = ", ";
+        boolean hasEntries = false;
+        if (_filters != null && _filters.hasFilters())
+        {
+            filterLogString.append(_filters.toString());
+            hasEntries = true;
+        }
+
+        if (!acquires())
+        {
+            if (hasEntries)
+            {
+                filterLogString.append(delimiter);
+            }
+            filterLogString.append("Browser");
+            hasEntries = true;
+        }
+
+        return filterLogString.toString();
+    }
+
+    public final boolean trySendLock()
+    {
+        return _stateChangeLock.tryLock();
+    }
+
+    public final void getSendLock()
+    {
+        _stateChangeLock.lock();
+    }
+
+    public final void releaseSendLock()
+    {
+        _stateChangeLock.unlock();
+    }
+
+    public final long getCreateTime()
+    {
+        return _createTime;
+    }
+
+    public final QueueEntry.SubscriptionAcquiredState getOwningState()
+    {
+        return _owningState;
+    }
+
+    public final boolean acquires()
+    {
+        return _acquires;
+    }
+
+    public final boolean seesRequeues()
+    {
+        return _seesRequeues;
+    }
+
+    public final String getName()
+    {
+        return _consumerName;
+    }
+
+    public final boolean isTransient()
+    {
+        return _isTransient;
+    }
+
+    public final long getBytesOut()
+    {
+        return _deliveredBytes.longValue();
+    }
+
+    public final long getMessagesOut()
+    {
+        return _deliveredCount.longValue();
+    }
+
+    public final void send(final QueueEntry entry, final boolean batch) throws AMQException
+    {
+        _deliveredCount.incrementAndGet();
+        _deliveredBytes.addAndGet(entry.getMessage().getSize());
+        _target.send(entry, batch);
+    }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Feb  4 20:30:32 2014
@@ -382,7 +382,7 @@ public class SimpleAMQQueue implements A
 
 
     @Override
-    public Subscription registerSubscription(final SubscriptionTarget target,
+    public QueueSubscription registerSubscription(final SubscriptionTarget target,
                                              final FilterManager filters,
                                              final Class<? extends ServerMessage> messageClass,
                                              final String consumerName,
@@ -471,7 +471,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
+    synchronized void unregisterSubscription(final QueueSubscription  subscription) throws AMQException
     {
         if (subscription == null)
         {
@@ -828,7 +828,7 @@ public class SimpleAMQQueue implements A
 
     private void setLastSeenEntry(final QueueSubscription sub, final QueueEntry entry)
     {
-        QueueContext subContext = (QueueContext) sub.getQueueContext();
+        QueueContext subContext = sub.getQueueContext();
         if (subContext != null)
         {
             QueueEntry releasedEntry = subContext.getReleasedEntry();
@@ -844,7 +844,7 @@ public class SimpleAMQQueue implements A
     private void updateSubRequeueEntry(final QueueSubscription sub, final QueueEntry entry)
     {
 
-        QueueContext subContext = (QueueContext) sub.getQueueContext();
+        QueueContext subContext = sub.getQueueContext();
         if(subContext != null)
         {
             QueueEntry oldEntry;
@@ -1611,7 +1611,7 @@ public class SimpleAMQQueue implements A
     private QueueEntry getNextAvailableEntry(final QueueSubscription sub)
             throws AMQException
     {
-        QueueContext context = (QueueContext) sub.getQueueContext();
+        QueueContext context = sub.getQueueContext();
         if(context != null)
         {
             QueueEntry lastSeen = context.getLastSeenEntry();
@@ -1651,7 +1651,7 @@ public class SimpleAMQQueue implements A
 
     public boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub)
     {
-        QueueContext context = (QueueContext) sub.getQueueContext();
+        QueueContext context = sub.getQueueContext();
         if(context != null)
         {
             QueueEntry releasedNode = context.getReleasedEntry();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java Tue Feb  4 20:30:32 2014
@@ -106,11 +106,6 @@ public interface Subscription
 
     public State getState();
 
-    AMQQueue.Context getQueueContext();
-
-    void setQueueContext(AMQQueue.Context queueContext);
-
-
     boolean isActive();
 
     void queueEmpty() throws AMQException;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Tue Feb  4 20:30:32 2014
@@ -109,7 +109,6 @@ public class DistributedTransaction impl
         {
             _branch.enqueue(queue, message);
             _branch.addPostTransactionAction(postTransactionAction);
-            enqueue(Collections.singletonList(queue), message, postTransactionAction);
         }
         else
         {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Feb  4 20:30:32 2014
@@ -69,7 +69,7 @@ public class SimpleAMQQueueTest extends 
     private String _routingKey = "routing key";
     private DirectExchange _exchange;
     private MockSubscription _subscriptionTarget = new MockSubscription();
-    private Subscription _subscription;
+    private QueueSubscription _subscription;
     private Map<String,Object> _arguments = null;
 
     @Override
@@ -183,7 +183,7 @@ public class SimpleAMQQueueTest extends 
         {
         }
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
-        assertNull(((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
+        assertNull(_subscription.getQueueContext().getReleasedEntry());
 
         // Check removing the subscription removes it's information from the queue
         _subscription.close();
@@ -206,7 +206,7 @@ public class SimpleAMQQueueTest extends 
                                                     EnumSet.noneOf(Subscription.Option.class));
         Thread.sleep(150);
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
-        assertNull("There should be no releasedEntry after an enqueue", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
+        assertNull("There should be no releasedEntry after an enqueue", _subscription.getQueueContext().getReleasedEntry());
     }
 
     /**
@@ -222,7 +222,7 @@ public class SimpleAMQQueueTest extends 
                                                     EnumSet.noneOf(Subscription.Option.class));
         Thread.sleep(150);
         assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
-        assertNull("There should be no releasedEntry after enqueues", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
+        assertNull("There should be no releasedEntry after enqueues",  _subscription.getQueueContext().getReleasedEntry());
     }
 
     /**
@@ -273,7 +273,7 @@ public class SimpleAMQQueueTest extends 
         assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
         assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
-        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
+        assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry());
     }
 
     /**
@@ -321,7 +321,7 @@ public class SimpleAMQQueueTest extends 
         assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
         assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
-        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
+        assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry());
 
     }
 
@@ -375,7 +375,7 @@ public class SimpleAMQQueueTest extends 
         assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
         assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
-        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
+        assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry());
     }
 
 
@@ -392,11 +392,11 @@ public class SimpleAMQQueueTest extends 
         MockSubscription target2 = new MockSubscription();
 
 
-        Subscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test",
+        QueueSubscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test",
                                                                  EnumSet.of(Subscription.Option.ACQUIRES,
                                                                             Subscription.Option.SEES_REQUEUES));
 
-        Subscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test",
+        QueueSubscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test",
                                                                  EnumSet.of(Subscription.Option.ACQUIRES,
                                                                             Subscription.Option.SEES_REQUEUES));
 
@@ -430,8 +430,8 @@ public class SimpleAMQQueueTest extends 
         assertEquals("Unexpected total number of messages sent to both subscriptions after release",
                      3,
                      target1.getMessages().size() + target2.getMessages().size());
-        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry());
-        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry());
+        assertNull("releasedEntry should be cleared after requeue processed", subscription1.getQueueContext().getReleasedEntry());
+        assertNull("releasedEntry should be cleared after requeue processed", subscription2.getQueueContext().getReleasedEntry());
     }
 
     public void testExclusiveConsumer() throws AMQException

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Feb  4 20:30:32 2014
@@ -1216,68 +1216,7 @@ public class AMQChannel implements AMQSe
         return getId().compareTo(o.getId());
     }
 
-    private class MessageDeliveryAction implements ServerTransaction.Action
-    {
-        private final MessageReference<AMQMessage> _reference;
-        private List<? extends BaseQueue> _destinationQueues;
-
-        public MessageDeliveryAction(AMQMessage currentMessage,
-                                     List<? extends BaseQueue> destinationQueues)
-        {
-            _reference = currentMessage.newReference();
-            _destinationQueues = destinationQueues;
-        }
-
-        public void postCommit()
-        {
-            try
-            {
-                AMQMessage message = _reference.getMessage();
-                final boolean immediate = message.isImmediate();
-
-                for(int i = 0; i < _destinationQueues.size(); i++)
-                {
-                    BaseQueue queue = _destinationQueues.get(i);
-
-                    Action<QueueEntry> action;
-
-                    if(immediate)
-                    {
-                        action = new ImmediateAction();
-                    }
-                    else
-                    {
-                        action = null;
-                    }
-
-                    queue.enqueue(message, action);
 
-                    if(queue instanceof AMQQueue)
-                    {
-                        ((AMQQueue)queue).checkCapacity(AMQChannel.this);
-                    }
-
-                }
-
-                message.getStoredMessage().flushToStore();
-                _reference.release();
-            }
-            catch (AMQException e)
-            {
-                // TODO
-                throw new RuntimeException(e);
-            }
-        }
-
-        public void onRollback()
-        {
-            // Maybe keep track of entries that were created and then delete them here in case of failure
-            // to in memory enqueue
-            _reference.release();
-        }
-
-
-    }
     private class ImmediateAction implements Action<QueueEntry>
     {
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java Tue Feb  4 20:30:32 2014
@@ -38,13 +38,11 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
 import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.AbstractSubscription;
 import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml?rev=1564470&r1=1564469&r2=1564470&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml Tue Feb  4 20:30:32 2014
@@ -39,6 +39,12 @@
     <artifact name="qpid-broker" type="jar.asc" ext="jar.asc"/>
     <artifact name="qpid-broker" type="source" ext="jar" e:classifier="sources"/>
     <artifact name="qpid-broker" type="source.asc" ext="jar.asc" e:classifier="sources"/>
+    <artifact name="qpid-broker-core" type="pom" ext="pom"/>
+    <artifact name="qpid-broker-core" type="pom.asc" ext="pom.asc"/>
+    <artifact name="qpid-broker-core" type="jar" ext="jar"/>
+    <artifact name="qpid-broker-core" type="jar.asc" ext="jar.asc"/>
+    <artifact name="qpid-broker-core" type="source" ext="jar" e:classifier="sources"/>
+    <artifact name="qpid-broker-core" type="source.asc" ext="jar.asc" e:classifier="sources"/>
     <artifact name="qpid-broker-plugins-access-control" type="pom" ext="pom"/>
     <artifact name="qpid-broker-plugins-access-control" type="pom.asc" ext="pom.asc"/>
     <artifact name="qpid-broker-plugins-access-control" type="jar" ext="jar"/>



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org