You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/03 09:14:20 UTC
svn commit: r1563796 - in
/qpid/branches/java-broker-amqp-1-0-management/java: ./
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
Author: rgodfrey
Date: Mon Feb 3 08:14:20 2014
New Revision: 1563796
URL: http://svn.apache.org/r1563796
Log:
Updates to subscription for 1.0 sesions
Added:
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
- copied, changed from r1563758, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
Removed:
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
Modified:
qpid/branches/java-broker-amqp-1-0-management/java/ (props changed)
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Propchange: qpid/branches/java-broker-amqp-1-0-management/java/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java:r1562456-1563431
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1563796&r1=1563795&r2=1563796&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Feb 3 08:14:20 2014
@@ -67,6 +67,8 @@ import org.apache.qpid.server.filter.Sim
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -79,7 +81,9 @@ public class SendingLink_1_0 implements
private VirtualHost _vhost;
private SendingDestination _destination;
- private Subscription_1_0 _subscription;
+ private Subscription _subscription;
+ private SubscriptionTarget_1_0 _target;
+
private boolean _draining;
private final Map<Binary, QueueEntry> _unsettledMap =
new HashMap<Binary, QueueEntry>();
@@ -168,7 +172,16 @@ public class SendingLink_1_0 implements
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY, messageFilter == null ? null : new SimpleFilterManager(messageFilter));
+ _target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+ _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class,
+ source.getDistributionMode() != StdDistMode.COPY,
+ source.getDistributionMode() != StdDistMode.COPY,
+ getEndpoint().getName(),
+ false,
+ _target);
+ _target.setSubscription(_subscription);
+
}
else if(destination instanceof ExchangeDestination)
{
@@ -357,7 +370,17 @@ public class SendingLink_1_0 implements
{
_logger.error("Error", e);
}
- _subscription = new Subscription_1_0(this, qd, true, messageFilter == null ? null : new SimpleFilterManager(messageFilter));
+
+
+ _target = new SubscriptionTarget_1_0(this, true);
+ _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class,
+ true,
+ true,
+ getEndpoint().getName(),
+ false,
+ _target);
+ _target.setSubscription(_subscription);
}
@@ -441,7 +464,7 @@ public class SendingLink_1_0 implements
else if(detach == null || detach.getError() != null)
{
_linkAttachment = null;
- _subscription.flowStateChanged();
+ _target.flowStateChanged();
}
else
{
@@ -489,7 +512,7 @@ public class SendingLink_1_0 implements
}
if(_resumeAcceptedTransfers.isEmpty())
{
- _subscription.flowStateChanged();
+ _target.flowStateChanged();
}
}
@@ -593,7 +616,7 @@ public class SendingLink_1_0 implements
if(_subscription.isActive())
{
- _subscription.suspend();
+ _target.suspend();
}
_linkAttachment = linkAttachment;
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (from r1563758, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java&r1=1563758&r2=1563796&rev=1563796&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java Mon Feb 3 08:14:20 2014
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.nio.ByteBuffer;
-import java.util.List;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -41,18 +38,23 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.AbstractSubscription;
+import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-class Subscription_1_0 extends AbstractSubscription implements Subscription
+import java.nio.ByteBuffer;
+import java.util.List;
+
+class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
{
+ private final boolean _acquires;
private SendingLink_1_0 _link;
private long _deliveryTag = 0L;
@@ -60,15 +62,26 @@ class Subscription_1_0 extends AbstractS
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
private final SectionEncoder _sectionEncoder;
+ private Subscription _subscription;
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires, FilterManager filters)
+ public SubscriptionTarget_1_0(final SendingLink_1_0 link,
+ boolean acquires)
{
- super(filters,Message_1_0.class,link.getSession().getConnectionReference(), acquires, acquires, link.getEndpoint().getName(), false);
+ super(State.SUSPENDED);
_link = link;
_typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
_sectionEncoder = new SectionEncoderImpl(_typeRegistry);
- setQueue(destination.getQueue(),false);
- updateState(State.ACTIVE, State.SUSPENDED);
+ _acquires = acquires;
+ }
+
+ public void setSubscription(Subscription sub)
+ {
+ _subscription = sub;
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
}
private SendingLinkEndpoint getEndpoint()
@@ -78,16 +91,16 @@ class Subscription_1_0 extends AbstractS
public boolean isSuspended()
{
- return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
}
- public void close()
+ public boolean close()
{
boolean closed = false;
State state = getState();
- getSendLock();
+ getSubscription().getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -97,19 +110,16 @@ class Subscription_1_0 extends AbstractS
{
state = getState();
}
- else
- {
- getStateListener().stateChanged(this, state, State.CLOSED);
- }
}
+ return closed;
}
finally
{
- releaseSendLock();
+ getSubscription().releaseSendLock();
}
}
- protected void doSend(QueueEntry entry, boolean batch) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
// TODO
send(entry);
@@ -223,7 +233,7 @@ class Subscription_1_0 extends AbstractS
}
else
{
- UnsettledAction action = acquires()
+ UnsettledAction action = _acquires
? new DispositionAction(tag, queueEntry)
: new DoNothingAction(tag, queueEntry);
@@ -237,7 +247,7 @@ class Subscription_1_0 extends AbstractS
transfer.setState(state);
}
// TODO - need to deal with failure here
- if(acquires() && _transactionId != null)
+ if(_acquires && _transactionId != null)
{
ServerTransaction txn = _link.getTransaction(_transactionId);
if(txn != null)
@@ -251,7 +261,7 @@ class Subscription_1_0 extends AbstractS
public void onRollback()
{
- if(queueEntry.isAcquiredBy(Subscription_1_0.this))
+ if(queueEntry.isAcquiredBy(getSubscription()))
{
queueEntry.release();
_link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -281,7 +291,7 @@ class Subscription_1_0 extends AbstractS
getEndpoint().detach();
}
- public boolean wouldSuspend(final QueueEntry msg)
+ public boolean allocateCredit(final QueueEntry msg)
{
synchronized (_link.getLock())
{
@@ -291,7 +301,7 @@ class Subscription_1_0 extends AbstractS
suspend();
}
- return !hasCredit;
+ return hasCredit;
}
}
@@ -300,10 +310,7 @@ class Subscription_1_0 extends AbstractS
{
synchronized(_link.getLock())
{
- if(updateState(State.ACTIVE, State.SUSPENDED))
- {
- getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED);
- }
+ updateState(State.ACTIVE, State.SUSPENDED);
}
}
@@ -319,10 +326,7 @@ class Subscription_1_0 extends AbstractS
{
if(_link.drained())
{
- if(updateState(State.ACTIVE, State.SUSPENDED))
- {
- getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED);
- }
+ updateState(State.ACTIVE, State.SUSPENDED);
}
}
}
@@ -333,10 +337,7 @@ class Subscription_1_0 extends AbstractS
{
if(isSuspended() && getEndpoint() != null)
{
- if(updateState(State.SUSPENDED, State.ACTIVE))
- {
- getStateListener().stateChanged(this, State.SUSPENDED, State.ACTIVE);
- }
+ updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
}
}
@@ -390,7 +391,7 @@ class Subscription_1_0 extends AbstractS
public void postCommit()
{
- if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
+ if(_queueEntry.isAcquiredBy(getSubscription()))
{
_queueEntry.delete();
}
@@ -500,7 +501,6 @@ class Subscription_1_0 extends AbstractS
@Override
public AMQSessionModel getSessionModel()
{
- // TODO
return getSession();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org