You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/08/30 12:13:51 UTC

svn commit: r1163161 - in /zookeeper/bookkeeper/trunk: CHANGES.txt hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java

Author: ivank
Date: Tue Aug 30 10:13:51 2011
New Revision: 1163161

URL: http://svn.apache.org/viewvc?rev=1163161&view=rev
Log:
BOOKKEEPER-52: Message sequence confuse due to the subscribeMsgQueue@SubscribeResponseHandler (xulei via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1163161&r1=1163160&r2=1163161&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Aug 30 10:13:51 2011
@@ -42,6 +42,10 @@ BUGFIXES:
   
   BOOKKEEPER-51: NullPointException at FIFODeliveryManager#deliveryPtrs (xulei via ivank)
 
+ hedwig-client/
+ 
+  BOOKKEEPER-52: Message sequence confuse due to the subscribeMsgQueue@SubscribeResponseHandler (xulei via ivank)
+
 IMPROVEMENTS:
 
  BOOKKEEPER-28: Create useful startup scripts for bookkeeper and hedwig (ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1163161&r1=1163160&r2=1163161&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Tue Aug 30 10:13:51 2011
@@ -162,29 +162,30 @@ public class SubscribeResponseHandler {
             logger.debug("Handling a Subscribe message in response: " + response + ", topic: "
                     + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
         Message message = response.getMessage();
-        // Consume the message asynchronously that the client is subscribed
-        // to. Do this only if delivery for the subscription has started and
-        // a MessageHandler has been registered for the TopicSubscriber.
-        if (messageHandler != null) {
-            asyncMessageConsume(message);
-        } else {
-            // MessageHandler has not yet been registered so queue up these
-            // messages for the Topic Subscription. Make the initial lazy
-            // creation of the message queue thread safe just so we don't
-            // run into a race condition where two simultaneous threads process
-            // a received message and both try to create a new instance of
-            // the message queue. Performance overhead should be okay
-            // because the delivery of the topic has not even started yet
-            // so these messages are not consumed and just buffered up here.
-            synchronized (this) {
+
+        synchronized (this) {
+            // Consume the message asynchronously that the client is subscribed
+            // to. Do this only if delivery for the subscription has started and
+            // a MessageHandler has been registered for the TopicSubscriber.
+            if (messageHandler != null) {
+                asyncMessageConsume(message);
+            } else {
+                // MessageHandler has not yet been registered so queue up these
+                // messages for the Topic Subscription. Make the initial lazy
+                // creation of the message queue thread safe just so we don't
+                // run into a race condition where two simultaneous threads process
+                // a received message and both try to create a new instance of
+                // the message queue. Performance overhead should be okay
+                // because the delivery of the topic has not even started yet
+                // so these messages are not consumed and just buffered up here.
                 if (subscribeMsgQueue == null)
                     subscribeMsgQueue = new LinkedList<Message>();
-            }
-            if (logger.isDebugEnabled())
-                logger
+                if (logger.isDebugEnabled())
+                    logger
                         .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: "
                                 + message);
-            subscribeMsgQueue.add(message);
+                subscribeMsgQueue.add(message);
+            }
         }
     }
 
@@ -298,23 +299,25 @@ public class SubscribeResponseHandler {
         if (logger.isDebugEnabled())
             logger.debug("Setting the messageHandler for topic: " + origSubData.topic.toStringUtf8()
                     + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
-        this.messageHandler = messageHandler;
-        // Once the MessageHandler is registered, see if we have any queued up
-        // subscription messages sent to us already from the server. If so,
-        // consume those first. Do this only if the MessageHandler registered is
-        // not null (since that would be the HedwigSubscriber.stopDelivery
-        // call).
-        if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
-            if (logger.isDebugEnabled())
-                logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
-                        + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                        + origSubData.subscriberId.toStringUtf8());
-            for (Message message : subscribeMsgQueue) {
-                asyncMessageConsume(message);
+        synchronized (this) {
+            this.messageHandler = messageHandler;
+            // Once the MessageHandler is registered, see if we have any queued up
+            // subscription messages sent to us already from the server. If so,
+            // consume those first. Do this only if the MessageHandler registered is
+            // not null (since that would be the HedwigSubscriber.stopDelivery
+            // call).
+            if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
+                if (logger.isDebugEnabled())
+                    logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
+                            + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                            + origSubData.subscriberId.toStringUtf8());
+                for (Message message : subscribeMsgQueue) {
+                    asyncMessageConsume(message);
+                }
+                // Now we can remove the queued up messages since they are all
+                // consumed.
+                subscribeMsgQueue.clear();
             }
-            // Now we can remove the queued up messages since they are all
-            // consumed.
-            subscribeMsgQueue.clear();
         }
     }