You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/04 23:07:45 UTC
svn commit: r1564522 - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/apache/qpid/server/subscripti...
Author: rgodfrey
Date: Tue Feb 4 22:07:44 2014
New Revision: 1564522
URL: http://svn.apache.org/r1564522
Log:
move state to MessageInstance from QueueEntry, change isRejectedBy to work on Subscription rather than id
Added:
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
- copied, changed from r1564445, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
- copied, changed from r1564445, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java
Removed:
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java
Modified:
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Tue Feb 4 22:07:44 2014
@@ -22,10 +22,131 @@ package org.apache.qpid.server.message;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.subscription.Subscription;
public interface MessageInstance
{
+
+
+ public static enum State
+ {
+ AVAILABLE,
+ ACQUIRED,
+ DEQUEUED,
+ DELETED
+ }
+
+ public abstract class EntryState
+ {
+ private EntryState()
+ {
+ }
+
+ public abstract State getState();
+
+ /**
+ * Returns true if state is either DEQUEUED or DELETED.
+ *
+ * @return true if state is either DEQUEUED or DELETED.
+ */
+ public boolean isDispensed()
+ {
+ State currentState = getState();
+ return currentState == State.DEQUEUED || currentState == State.DELETED;
+ }
+ }
+
+
+ public final class AvailableState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.AVAILABLE;
+ }
+
+ public String toString()
+ {
+ return getState().name();
+ }
+ }
+
+
+ public final class DequeuedState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.DEQUEUED;
+ }
+
+ public String toString()
+ {
+ return getState().name();
+ }
+ }
+
+
+ public final class DeletedState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.DELETED;
+ }
+
+ public String toString()
+ {
+ return getState().name();
+ }
+ }
+
+ public final class NonSubscriptionAcquiredState extends EntryState
+ {
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public String toString()
+ {
+ return getState().name();
+ }
+ }
+
+ public final class SubscriptionAcquiredState extends EntryState
+ {
+ private final Subscription _subscription;
+
+ public SubscriptionAcquiredState(Subscription subscription)
+ {
+ _subscription = subscription;
+ }
+
+
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
+ }
+
+
+ final static EntryState AVAILABLE_STATE = new AvailableState();
+ final static EntryState DELETED_STATE = new DeletedState();
+ final static EntryState DEQUEUED_STATE = new DequeuedState();
+ final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+
boolean isAvailable();
boolean acquire();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Feb 4 22:07:44 2014
@@ -31,147 +31,6 @@ import org.apache.qpid.server.util.State
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
-
-
- public static enum State
- {
- AVAILABLE,
- ACQUIRED,
- EXPIRED,
- DEQUEUED,
- DELETED;
-
-
- }
-
- public abstract class EntryState
- {
- private EntryState()
- {
- }
-
- public abstract State getState();
-
- /**
- * Returns true if state is either DEQUEUED or DELETED.
- *
- * @return true if state is either DEQUEUED or DELETED.
- */
- public boolean isDispensed()
- {
- State currentState = getState();
- return currentState == State.DEQUEUED || currentState == State.DELETED;
- }
- }
-
-
- public final class AvailableState extends EntryState
- {
-
- public State getState()
- {
- return State.AVAILABLE;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class DequeuedState extends EntryState
- {
-
- public State getState()
- {
- return State.DEQUEUED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class DeletedState extends EntryState
- {
-
- public State getState()
- {
- return State.DELETED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
- public final class ExpiredState extends EntryState
- {
-
- public State getState()
- {
- return State.EXPIRED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class NonSubscriptionAcquiredState extends EntryState
- {
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
- public final class SubscriptionAcquiredState extends EntryState
- {
- private final Subscription _subscription;
-
- public SubscriptionAcquiredState(Subscription subscription)
- {
- _subscription = subscription;
- }
-
-
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public Subscription getSubscription()
- {
- return _subscription;
- }
-
- public String toString()
- {
- return "{" + getState().name() + " : " + _subscription +"}";
- }
- }
-
-
- final static EntryState AVAILABLE_STATE = new AvailableState();
- final static EntryState DELETED_STATE = new DeletedState();
- final static EntryState DEQUEUED_STATE = new DequeuedState();
- final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
-
AMQQueue getQueue();
long getSize();
@@ -193,7 +52,7 @@ public interface QueueEntry extends Mess
void reject();
- boolean isRejectedBy(long subscriptionId);
+ boolean isRejectedBy(Subscription subscription);
int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Feb 4 22:07:44 2014
@@ -294,12 +294,12 @@ public abstract class QueueEntryImpl imp
}
}
- public boolean isRejectedBy(long subscriptionId)
+ public boolean isRejectedBy(Subscription subscription)
{
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
- return _rejectedBy.contains(subscriptionId);
+ return _rejectedBy.contains(subscription.getSubscriptionID());
}
else // This message hasn't been rejected yet.
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java Tue Feb 4 22:07:44 2014
@@ -227,13 +227,13 @@ class QueueSubscription<T extends Subscr
@Override
public boolean wouldSuspend(final QueueEntry msg)
{
- return !_target.allocateCredit(msg);
+ return !_target.allocateCredit(msg.getMessage());
}
@Override
public void restoreCredit(final QueueEntry queueEntry)
{
- _target.restoreCredit(queueEntry);
+ _target.restoreCredit(queueEntry.getMessage());
}
@Override
@@ -356,7 +356,7 @@ class QueueSubscription<T extends Subscr
public final boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
+ if (entry.isRejectedBy(this))
{
return false;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java Tue Feb 4 22:07:44 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.subscription;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.util.StateChangeListener;
@@ -56,9 +57,9 @@ public interface SubscriptionTarget
void queueEmpty() throws AMQException;
- boolean allocateCredit(QueueEntry msg);
+ boolean allocateCredit(ServerMessage msg);
- void restoreCredit(QueueEntry queueEntry);
+ void restoreCredit(ServerMessage queueEntry);
boolean isSuspended();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Tue Feb 4 22:07:44 2014
@@ -118,7 +118,7 @@ public class MockQueueEntry implements Q
}
- public boolean isRejectedBy(long subscriptionId)
+ public boolean isRejectedBy(Subscription subscription)
{
return false;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Tue Feb 4 22:07:44 2014
@@ -23,8 +23,7 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.QueueEntry.EntryState;
-import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.subscription.Subscription;
import java.lang.reflect.Field;
@@ -154,9 +153,8 @@ public abstract class QueueEntryImplTest
public void testRejectAndRejectedBy()
{
Subscription sub = newMockSubscription();
- long subId = sub.getSubscriptionID();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+ assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
//acquire, reject, and release the message using the subscription
@@ -165,19 +163,18 @@ public abstract class QueueEntryImplTest
_queueEntry.release();
//verify the rejection is recorded
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
//repeat rejection using a second subscription
Subscription sub2 = newMockSubscription();
- long sub2Id = sub2.getSubscriptionID();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+ assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
_queueEntry.reject();
//verify it still records being rejected by both subscriptions
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
}
/**
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Tue Feb 4 22:07:44 2014
@@ -27,15 +27,14 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.StateChangeListener;
@@ -153,7 +152,7 @@ public class MockSubscription implements
{
}
- public void restoreCredit(QueueEntry queueEntry)
+ public void restoreCredit(ServerMessage message)
{
}
@@ -215,7 +214,7 @@ public class MockSubscription implements
}
@Override
- public boolean allocateCredit(final QueueEntry msg)
+ public boolean allocateCredit(final ServerMessage msg)
{
return true;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Tue Feb 4 22:07:44 2014
@@ -44,7 +44,7 @@ public class MessageAcceptCompletionList
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry);
+ _sub.restoreCredit(_entry.getMessage());
}
if(_entry.isAcquiredBy(_sub.getSubscription()))
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java Tue Feb 4 22:07:44 2014
@@ -344,7 +344,7 @@ public class SubscriptionTarget_0_10 ext
{
if (restoreCredit)
{
- restoreCredit(entry);
+ restoreCredit(entry.getMessage());
}
entry.delete();
}
@@ -433,14 +433,14 @@ public class SubscriptionTarget_0_10 ext
_deleted.set(true);
}
- public boolean allocateCredit(QueueEntry entry)
+ public boolean allocateCredit(ServerMessage message)
{
- return _creditManager.useCreditForMessage(entry.getMessage().getSize());
+ return _creditManager.useCreditForMessage(message.getSize());
}
- public void restoreCredit(QueueEntry queueEntry)
+ public void restoreCredit(ServerMessage message)
{
- _creditManager.restoreCredit(1, queueEntry.getSize());
+ _creditManager.restoreCredit(1, message.getSize());
}
public FlowCreditManager_0_10 getCreditManager()
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Feb 4 22:07:44 2014
@@ -62,16 +62,12 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Tue Feb 4 22:07:44 2014
@@ -94,7 +94,6 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java Tue Feb 4 22:07:44 2014
@@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.virtualhost.VirtualHost;
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java (from r1564445, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java&r1=1564445&r2=1564522&rev=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java Tue Feb 4 22:07:44 2014
@@ -18,11 +18,12 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
public interface ClientDeliveryMethod
{
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java (from r1564445, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java&r1=1564445&r2=1564522&rev=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java Tue Feb 4 22:07:44 2014
@@ -18,9 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription;
public interface RecordDeliveryMethod
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java Tue Feb 4 22:07:44 2014
@@ -26,8 +26,6 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -35,8 +33,6 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -62,7 +58,7 @@ public abstract class SubscriptionTarget
{
if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
{
- restoreCredit(entry);
+ restoreCredit(entry.getMessage());
}
entry.removeStateChangeListener(this);
}
@@ -120,7 +116,7 @@ public abstract class SubscriptionTarget
}
@Override
- public boolean allocateCredit(QueueEntry msg)
+ public boolean allocateCredit(ServerMessage msg)
{
return true;
}
@@ -201,7 +197,7 @@ public abstract class SubscriptionTarget
}
@Override
- public boolean allocateCredit(QueueEntry msg)
+ public boolean allocateCredit(ServerMessage msg)
{
return true;
}
@@ -236,9 +232,9 @@ public abstract class SubscriptionTarget
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public boolean allocateCredit(QueueEntry msg)
+ public boolean allocateCredit(ServerMessage msg)
{
- return getCreditManager().useCreditForMessage(msg.getMessage().getSize());
+ return getCreditManager().useCreditForMessage(msg.getSize());
}
}
@@ -441,9 +437,9 @@ public abstract class SubscriptionTarget
}
- public boolean allocateCredit(QueueEntry msg)
+ public boolean allocateCredit(ServerMessage msg)
{
- return _creditManager.useCreditForMessage(msg.getMessage().getSize());
+ return _creditManager.useCreditForMessage(msg.getSize());
}
public AMQChannel getChannel()
@@ -461,9 +457,9 @@ public abstract class SubscriptionTarget
return _channel.getProtocolSession();
}
- public void restoreCredit(final QueueEntry queueEntry)
+ public void restoreCredit(final ServerMessage message)
{
- _creditManager.restoreCredit(1, queueEntry.getSize());
+ _creditManager.restoreCredit(1, message.getSize());
}
protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener()
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Tue Feb 4 22:07:44 2014
@@ -42,8 +42,8 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Tue Feb 4 22:07:44 2014
@@ -23,20 +23,20 @@ package org.apache.qpid.server.protocol.
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -62,40 +62,51 @@ public class ExtractResendAndRequeueTest
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
- private AMQQueue _queue = new MockAMQQueue(getName());
+ private AMQQueue _queue;
private MessageStore _messageStore = new TestMemoryMessageStore();
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+ private Subscription _subscription;
+ private boolean _queueDeleted;
@Override
public void setUp() throws AMQException
{
+ _queueDeleted = false;
_unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn(getName());
+ when(_queue.isDeleted()).thenReturn(_queueDeleted);
+ _subscription = mock(Subscription.class);
+ when(_subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+
long id = 0;
- SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
// Add initial messages to QueueEntryList
for (int count = 0; count < INITIAL_MSG_COUNT; count++)
{
- AMQMessage msg = new MockAMQMessage(id);
-
- list.add(msg);
+ ServerMessage msg = mock(ServerMessage.class);
+ when(msg.getMessageNumber()).thenReturn(id);
+ final QueueEntry entry = mock(QueueEntry.class);
+ when(entry.getMessage()).thenReturn(msg);
+ when(entry.getQueue()).thenReturn(_queue);
+ when(entry.isQueueDeleted()).thenReturn(_queueDeleted);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ when(entry.isDeleted()).thenReturn(true);
+ return null;
+ }
+ }).when(entry).delete();
+ _unacknowledgedMessageMap.add(id, entry);
+ _referenceList.add(entry);
//Increment ID;
id++;
}
- // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList
- QueueEntryIterator queueEntries = list.iterator();
- while(queueEntries.advance())
- {
- QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
-
- // Store the entry for future inspection
- _referenceList.add(entry);
- }
-
assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
}
@@ -106,19 +117,14 @@ public class ExtractResendAndRequeueTest
*
* @return Subscription that performed the acquire
*/
- private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
+ private void acquireMessages(LinkedList<QueueEntry> messageList)
{
- Subscription subscription = mock(Subscription.class);
- when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
- when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
// Acquire messages in subscription
- for (QueueEntry entry : messageList)
+ for(QueueEntry entry : messageList)
{
- entry.acquire(subscription);
+ when(entry.getDeliveredSubscription()).thenReturn(_subscription);
}
-
- return subscription;
}
/**
@@ -133,7 +139,7 @@ public class ExtractResendAndRequeueTest
public void testResend() throws AMQException
{
//We don't need the subscription object here.
- createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
@@ -159,10 +165,10 @@ public class ExtractResendAndRequeueTest
*/
public void testRequeueDueToSubscriptionClosure() throws AMQException
{
- Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
// Close subscription
- when(subscription.isClosed()).thenReturn(true);
+ when(_subscription.isClosed()).thenReturn(true);
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
@@ -239,8 +245,7 @@ public class ExtractResendAndRequeueTest
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
- _queue.delete();
-
+ _queueDeleted = true;
// requeueIfUnableToResend : value doesn't matter here as queue has been deleted
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
msgToResend, false, _messageStore));
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Tue Feb 4 22:07:44 2014
@@ -47,10 +47,8 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java?rev=1564522&r1=1564521&r2=1564522&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java Tue Feb 4 22:07:44 2014
@@ -284,7 +284,7 @@ class SubscriptionTarget_1_0 extends Abs
getEndpoint().detach();
}
- public boolean allocateCredit(final QueueEntry msg)
+ public boolean allocateCredit(final ServerMessage msg)
{
synchronized (_link.getLock())
{
@@ -308,7 +308,7 @@ class SubscriptionTarget_1_0 extends Abs
}
- public void restoreCredit(final QueueEntry queueEntry)
+ public void restoreCredit(final ServerMessage message)
{
//TODO
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org