You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/26 12:19:19 UTC
svn commit: r1526438 - in /qpid/trunk/qpid/java:
amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messagin...
Author: rgodfrey
Date: Thu Sep 26 10:19:19 2013
New Revision: 1526438
URL: http://svn.apache.org/r1526438
Log:
QPID-4901 : Queue Browser hangs on reaching end of queue
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java?rev=1526438&r1=1526437&r2=1526438&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java Thu Sep 26 10:19:19 2013
@@ -146,13 +146,26 @@ public class QueueBrowserImpl implements
if( _needNext )
{
_needNext = false;
- _nextElement = createJMSMessage(_receiver.receive(0L));
+ Message msg = _receiver.receive(0L);
+ if(msg != null)
+ {
+ _receiver.acknowledge(msg);
+ }
+ _nextElement = createJMSMessage(msg);
if( _nextElement == null )
{
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
// Drain to verify there really are no more messages.
_receiver.drain();
_receiver.drainWait();
- _nextElement = createJMSMessage(_receiver.receive(0L));
+ msg = _receiver.receive(0L);
+
+ if(msg != null)
+ {
+ _receiver.acknowledge(msg);
+ }
+ _nextElement = createJMSMessage(msg);
+
if( _nextElement == null )
{
close();
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1526438&r1=1526437&r2=1526438&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java Thu Sep 26 10:19:19 2013
@@ -444,13 +444,25 @@ public abstract class LinkEndpoint<T ext
sendFlow(_flowTransactionId != null);
}
+ public void sendFlowWithEcho()
+ {
+ sendFlow(_flowTransactionId != null, true);
+ }
+
+
public void sendFlow(boolean setTransactionId)
{
+ sendFlow(setTransactionId, false);
+ }
+
+ public void sendFlow(boolean setTransactionId, boolean echo)
+ {
if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
{
Flow flow = new Flow();
flow.setLinkCredit(_linkCredit);
flow.setDeliveryCount(_deliveryCount);
+ flow.setEcho(echo);
_lastSentCreditLimit = _linkCredit.add(_deliveryCount);
flow.setAvailable(_available);
flow.setDrain(_drain);
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java?rev=1526438&r1=1526437&r2=1526438&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java Thu Sep 26 10:19:19 2013
@@ -288,7 +288,7 @@ public class ReceivingLinkEndpoint exten
setDrain(true);
_creditWindow = false;
_drainLimit = getDeliveryCount().add(getLinkCredit());
- sendFlow();
+ sendFlowWithEcho();
getLock().notifyAll();
}
}
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java?rev=1526438&r1=1526437&r2=1526438&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java Thu Sep 26 10:19:19 2013
@@ -231,7 +231,7 @@ public class SourceConstructor extends D
try
{
- obj.setDistributionMode( (DistributionMode) val );
+ obj.setDistributionMode( StdDistMode.valueOf(val) );
}
catch(ClassCastException e)
{
@@ -326,7 +326,7 @@ public class SourceConstructor extends D
// TODO Error
}
}
-
+
}
@@ -360,7 +360,7 @@ public class SourceConstructor extends D
// TODO Error
}
}
-
+
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1526438&r1=1526437&r2=1526438&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Sep 26 10:19:19 2013
@@ -164,7 +164,7 @@ public class SendingLink_1_0 implements
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _subscription = new Subscription_1_0(this, qd);
+ _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY);
}
else if(destination instanceof ExchangeDestination)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org