You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/11/28 23:01:54 UTC

svn commit: r1207648 - in /zookeeper/bookkeeper/trunk: CHANGES.txt hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java

Author: breed
Date: Mon Nov 28 22:01:53 2011
New Revision: 1207648

URL: http://svn.apache.org/viewvc?rev=1207648&view=rev
Log:
BOOKKEEPER-53: race condition of outstandingMsgSet@SubscribeResponseHandler

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1207648&r1=1207647&r2=1207648&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Nov 28 22:01:53 2011
@@ -114,6 +114,8 @@ BUGFIXES:
 
   BOOKKEEPER-117: Support multi threads in hedwig cpp client to leverage multi-core hardware (Sijie Guo via ivank)
 
+  BOOKKEEPER-53: race condition of outstandingMsgSet@SubscribeResponseHandler (fpj via breed)
+
 IMPROVEMENTS:
 
  BOOKKEEPER-28: Create useful startup scripts for bookkeeper and hedwig (ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1207648&r1=1207647&r2=1207648&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Mon Nov 28 22:01:53 2011
@@ -17,7 +17,8 @@
  */
 package org.apache.hedwig.client.handlers;
 
-import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
@@ -112,18 +113,20 @@ public class SubscribeResponseHandler {
             // this only on a successful ack response from the server.
             TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
             responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
-            // Lazily create the Set to keep track of outstanding Messages
-            // to be consumed by the client app. At this stage, delivery for
-            // that topic hasn't started yet so creation of this Set should
-            // be thread safe. We'll create the Set with an initial capacity
-            // equal to the configured parameter for the maximum number of
+            // Lazily create the Set (from a concurrent hashmap) to keep track
+            // of outstanding Messages to be consumed by the client app. At this
+            // stage, delivery for that topic hasn't started yet so creation of 
+            // this Set should be thread safe. We'll create the Set with an initial
+            // capacity equal to the configured parameter for the maximum number of
             // outstanding messages to allow. The load factor will be set to
             // 1.0f which means we'll only rehash and allocate more space if
             // we ever exceed the initial capacity. That should be okay
             // because when that happens, things are slow already and piling
             // up on the client app side to consume messages.
-            outstandingMsgSet = new HashSet<Message>(
-                responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f);
+            
+            outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap<Message,Boolean>(
+                responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
+            
             // Response was success so invoke the callback's operationFinished
             // method.
             pubSubData.callback.operationFinished(pubSubData.context, null);