You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/03 09:14:20 UTC

svn commit: r1563796 - in /qpid/branches/java-broker-amqp-1-0-management/java: ./ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/

Author: rgodfrey
Date: Mon Feb  3 08:14:20 2014
New Revision: 1563796

URL: http://svn.apache.org/r1563796
Log:
Updates to subscription for 1.0 sesions

Added:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
      - copied, changed from r1563758, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
Removed:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
Modified:
    qpid/branches/java-broker-amqp-1-0-management/java/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java

Propchange: qpid/branches/java-broker-amqp-1-0-management/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1562456-1563431

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1563796&r1=1563795&r2=1563796&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Feb  3 08:14:20 2014
@@ -67,6 +67,8 @@ import org.apache.qpid.server.filter.Sim
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
+import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
@@ -79,7 +81,9 @@ public class SendingLink_1_0 implements 
     private VirtualHost _vhost;
     private SendingDestination _destination;
 
-    private Subscription_1_0 _subscription;
+    private Subscription _subscription;
+    private SubscriptionTarget_1_0 _target;
+
     private boolean _draining;
     private final Map<Binary, QueueEntry> _unsettledMap =
             new HashMap<Binary, QueueEntry>();
@@ -168,7 +172,16 @@ public class SendingLink_1_0 implements 
             }
             source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
 
-            _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY, messageFilter == null ? null : new SimpleFilterManager(messageFilter));
+            _target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+            _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+                                                                               Message_1_0.class,
+                                                                               source.getDistributionMode() != StdDistMode.COPY,
+                                                                               source.getDistributionMode() != StdDistMode.COPY,
+                                                                               getEndpoint().getName(),
+                                                                               false,
+                                                                               _target);
+            _target.setSubscription(_subscription);
+
         }
         else if(destination instanceof ExchangeDestination)
         {
@@ -357,7 +370,17 @@ public class SendingLink_1_0 implements 
             {
                 _logger.error("Error", e);
             }
-            _subscription = new Subscription_1_0(this, qd, true, messageFilter == null ? null : new SimpleFilterManager(messageFilter));
+
+
+            _target = new SubscriptionTarget_1_0(this, true);
+            _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+                                                                               Message_1_0.class,
+                                                                               true,
+                                                                               true,
+                                                                               getEndpoint().getName(),
+                                                                               false,
+                                                                               _target);
+            _target.setSubscription(_subscription);
 
         }
 
@@ -441,7 +464,7 @@ public class SendingLink_1_0 implements 
         else if(detach == null || detach.getError() != null)
         {
             _linkAttachment = null;
-            _subscription.flowStateChanged();
+            _target.flowStateChanged();
         }
         else
         {
@@ -489,7 +512,7 @@ public class SendingLink_1_0 implements 
         }
         if(_resumeAcceptedTransfers.isEmpty())
         {
-            _subscription.flowStateChanged();
+            _target.flowStateChanged();
         }
 
     }
@@ -593,7 +616,7 @@ public class SendingLink_1_0 implements 
 
         if(_subscription.isActive())
         {
-            _subscription.suspend();
+            _target.suspend();
         }
 
         _linkAttachment = linkAttachment;

Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (from r1563758, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java&r1=1563758&r2=1563796&rev=1563796&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java Mon Feb  3 08:14:20 2014
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.amqp_1_0.codec.ValueHandler;
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -41,18 +38,23 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
 import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.subscription.AbstractSubscription;
+import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
 
-class Subscription_1_0 extends AbstractSubscription implements Subscription
+import java.nio.ByteBuffer;
+import java.util.List;
+
+class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
 {
+    private final boolean _acquires;
     private SendingLink_1_0 _link;
 
     private long _deliveryTag = 0L;
@@ -60,15 +62,26 @@ class Subscription_1_0 extends AbstractS
     private Binary _transactionId;
     private final AMQPDescribedTypeRegistry _typeRegistry;
     private final SectionEncoder _sectionEncoder;
+    private Subscription _subscription;
 
-    public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires, FilterManager filters)
+    public SubscriptionTarget_1_0(final SendingLink_1_0 link,
+                                  boolean acquires)
     {
-        super(filters,Message_1_0.class,link.getSession().getConnectionReference(), acquires, acquires, link.getEndpoint().getName(), false);
+        super(State.SUSPENDED);
         _link = link;
         _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
         _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
-        setQueue(destination.getQueue(),false);
-        updateState(State.ACTIVE, State.SUSPENDED);
+        _acquires = acquires;
+    }
+
+    public void setSubscription(Subscription sub)
+    {
+        _subscription = sub;
+    }
+
+    public Subscription getSubscription()
+    {
+        return _subscription;
     }
 
     private SendingLinkEndpoint getEndpoint()
@@ -78,16 +91,16 @@ class Subscription_1_0 extends AbstractS
 
     public boolean isSuspended()
     {
-        return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
+        return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
 
     }
 
-    public void close()
+    public boolean close()
     {
         boolean closed = false;
         State state = getState();
 
-        getSendLock();
+        getSubscription().getSendLock();
         try
         {
             while(!closed && state != State.CLOSED)
@@ -97,19 +110,16 @@ class Subscription_1_0 extends AbstractS
                 {
                     state = getState();
                 }
-                else
-                {
-                    getStateListener().stateChanged(this, state, State.CLOSED);
-                }
             }
+            return closed;
         }
         finally
         {
-            releaseSendLock();
+            getSubscription().releaseSendLock();
         }
     }
 
-    protected void doSend(QueueEntry entry, boolean batch) throws AMQException
+    public void send(QueueEntry entry, boolean batch) throws AMQException
     {
         // TODO
         send(entry);
@@ -223,7 +233,7 @@ class Subscription_1_0 extends AbstractS
                 }
                 else
                 {
-                    UnsettledAction action = acquires()
+                    UnsettledAction action = _acquires
                                              ? new DispositionAction(tag, queueEntry)
                                              : new DoNothingAction(tag, queueEntry);
 
@@ -237,7 +247,7 @@ class Subscription_1_0 extends AbstractS
                     transfer.setState(state);
                 }
                 // TODO - need to deal with failure here
-                if(acquires() && _transactionId != null)
+                if(_acquires && _transactionId != null)
                 {
                     ServerTransaction txn = _link.getTransaction(_transactionId);
                     if(txn != null)
@@ -251,7 +261,7 @@ class Subscription_1_0 extends AbstractS
 
                             public void onRollback()
                             {
-                                if(queueEntry.isAcquiredBy(Subscription_1_0.this))
+                                if(queueEntry.isAcquiredBy(getSubscription()))
                                 {
                                     queueEntry.release();
                                     _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -281,7 +291,7 @@ class Subscription_1_0 extends AbstractS
         getEndpoint().detach();
     }
 
-    public boolean wouldSuspend(final QueueEntry msg)
+    public boolean allocateCredit(final QueueEntry msg)
     {
         synchronized (_link.getLock())
         {
@@ -291,7 +301,7 @@ class Subscription_1_0 extends AbstractS
                 suspend();
             }
 
-            return !hasCredit;
+            return hasCredit;
         }
     }
 
@@ -300,10 +310,7 @@ class Subscription_1_0 extends AbstractS
     {
         synchronized(_link.getLock())
         {
-            if(updateState(State.ACTIVE, State.SUSPENDED))
-            {
-                getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED);
-            }
+            updateState(State.ACTIVE, State.SUSPENDED);
         }
     }
 
@@ -319,10 +326,7 @@ class Subscription_1_0 extends AbstractS
         {
             if(_link.drained())
             {
-                if(updateState(State.ACTIVE, State.SUSPENDED))
-                {
-                    getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED);
-                }
+                updateState(State.ACTIVE, State.SUSPENDED);
             }
         }
     }
@@ -333,10 +337,7 @@ class Subscription_1_0 extends AbstractS
         {
             if(isSuspended() && getEndpoint() != null)
             {
-                if(updateState(State.SUSPENDED, State.ACTIVE))
-                {
-                    getStateListener().stateChanged(this, State.SUSPENDED, State.ACTIVE);
-                }
+                updateState(State.SUSPENDED, State.ACTIVE);
                 _transactionId = _link.getTransactionId();
             }
         }
@@ -390,7 +391,7 @@ class Subscription_1_0 extends AbstractS
 
                             public void postCommit()
                             {
-                                if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
+                                if(_queueEntry.isAcquiredBy(getSubscription()))
                                 {
                                     _queueEntry.delete();
                                 }
@@ -500,7 +501,6 @@ class Subscription_1_0 extends AbstractS
     @Override
     public AMQSessionModel getSessionModel()
     {
-        // TODO
         return getSession();
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org