You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/08/30 12:13:51 UTC
svn commit: r1163161 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
Author: ivank
Date: Tue Aug 30 10:13:51 2011
New Revision: 1163161
URL: http://svn.apache.org/viewvc?rev=1163161&view=rev
Log:
BOOKKEEPER-52: Message sequence confuse due to the subscribeMsgQueue@SubscribeResponseHandler (xulei via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1163161&r1=1163160&r2=1163161&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Aug 30 10:13:51 2011
@@ -42,6 +42,10 @@ BUGFIXES:
BOOKKEEPER-51: NullPointException at FIFODeliveryManager#deliveryPtrs (xulei via ivank)
+ hedwig-client/
+
+ BOOKKEEPER-52: Message sequence confuse due to the subscribeMsgQueue@SubscribeResponseHandler (xulei via ivank)
+
IMPROVEMENTS:
BOOKKEEPER-28: Create useful startup scripts for bookkeeper and hedwig (ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1163161&r1=1163160&r2=1163161&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Tue Aug 30 10:13:51 2011
@@ -162,29 +162,30 @@ public class SubscribeResponseHandler {
logger.debug("Handling a Subscribe message in response: " + response + ", topic: "
+ origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
Message message = response.getMessage();
- // Consume the message asynchronously that the client is subscribed
- // to. Do this only if delivery for the subscription has started and
- // a MessageHandler has been registered for the TopicSubscriber.
- if (messageHandler != null) {
- asyncMessageConsume(message);
- } else {
- // MessageHandler has not yet been registered so queue up these
- // messages for the Topic Subscription. Make the initial lazy
- // creation of the message queue thread safe just so we don't
- // run into a race condition where two simultaneous threads process
- // a received message and both try to create a new instance of
- // the message queue. Performance overhead should be okay
- // because the delivery of the topic has not even started yet
- // so these messages are not consumed and just buffered up here.
- synchronized (this) {
+
+ synchronized (this) {
+ // Consume the message asynchronously that the client is subscribed
+ // to. Do this only if delivery for the subscription has started and
+ // a MessageHandler has been registered for the TopicSubscriber.
+ if (messageHandler != null) {
+ asyncMessageConsume(message);
+ } else {
+ // MessageHandler has not yet been registered so queue up these
+ // messages for the Topic Subscription. Make the initial lazy
+ // creation of the message queue thread safe just so we don't
+ // run into a race condition where two simultaneous threads process
+ // a received message and both try to create a new instance of
+ // the message queue. Performance overhead should be okay
+ // because the delivery of the topic has not even started yet
+ // so these messages are not consumed and just buffered up here.
if (subscribeMsgQueue == null)
subscribeMsgQueue = new LinkedList<Message>();
- }
- if (logger.isDebugEnabled())
- logger
+ if (logger.isDebugEnabled())
+ logger
.debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: "
+ message);
- subscribeMsgQueue.add(message);
+ subscribeMsgQueue.add(message);
+ }
}
}
@@ -298,23 +299,25 @@ public class SubscribeResponseHandler {
if (logger.isDebugEnabled())
logger.debug("Setting the messageHandler for topic: " + origSubData.topic.toStringUtf8()
+ ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
- this.messageHandler = messageHandler;
- // Once the MessageHandler is registered, see if we have any queued up
- // subscription messages sent to us already from the server. If so,
- // consume those first. Do this only if the MessageHandler registered is
- // not null (since that would be the HedwigSubscriber.stopDelivery
- // call).
- if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
- if (logger.isDebugEnabled())
- logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
- + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8());
- for (Message message : subscribeMsgQueue) {
- asyncMessageConsume(message);
+ synchronized (this) {
+ this.messageHandler = messageHandler;
+ // Once the MessageHandler is registered, see if we have any queued up
+ // subscription messages sent to us already from the server. If so,
+ // consume those first. Do this only if the MessageHandler registered is
+ // not null (since that would be the HedwigSubscriber.stopDelivery
+ // call).
+ if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
+ if (logger.isDebugEnabled())
+ logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
+ + origSubData.topic.toStringUtf8() + ", subscriberId: "
+ + origSubData.subscriberId.toStringUtf8());
+ for (Message message : subscribeMsgQueue) {
+ asyncMessageConsume(message);
+ }
+ // Now we can remove the queued up messages since they are all
+ // consumed.
+ subscribeMsgQueue.clear();
}
- // Now we can remove the queued up messages since they are all
- // consumed.
- subscribeMsgQueue.clear();
}
}