You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/11/28 23:01:54 UTC
svn commit: r1207648 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
Author: breed
Date: Mon Nov 28 22:01:53 2011
New Revision: 1207648
URL: http://svn.apache.org/viewvc?rev=1207648&view=rev
Log:
BOOKKEEPER-53: race condition of outstandingMsgSet@SubscribeResponseHandler
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1207648&r1=1207647&r2=1207648&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Nov 28 22:01:53 2011
@@ -114,6 +114,8 @@ BUGFIXES:
BOOKKEEPER-117: Support multi threads in hedwig cpp client to leverage multi-core hardware (Sijie Guo via ivank)
+ BOOKKEEPER-53: race condition of outstandingMsgSet@SubscribeResponseHandler (fpj via breed)
+
IMPROVEMENTS:
BOOKKEEPER-28: Create useful startup scripts for bookkeeper and hedwig (ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1207648&r1=1207647&r2=1207648&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Mon Nov 28 22:01:53 2011
@@ -17,7 +17,8 @@
*/
package org.apache.hedwig.client.handlers;
-import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
@@ -112,18 +113,20 @@ public class SubscribeResponseHandler {
// this only on a successful ack response from the server.
TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
- // Lazily create the Set to keep track of outstanding Messages
- // to be consumed by the client app. At this stage, delivery for
- // that topic hasn't started yet so creation of this Set should
- // be thread safe. We'll create the Set with an initial capacity
- // equal to the configured parameter for the maximum number of
+ // Lazily create the Set (from a concurrent hashmap) to keep track
+ // of outstanding Messages to be consumed by the client app. At this
+ // stage, delivery for that topic hasn't started yet so creation of
+ // this Set should be thread safe. We'll create the Set with an initial
+ // capacity equal to the configured parameter for the maximum number of
// outstanding messages to allow. The load factor will be set to
// 1.0f which means we'll only rehash and allocate more space if
// we ever exceed the initial capacity. That should be okay
// because when that happens, things are slow already and piling
// up on the client app side to consume messages.
- outstandingMsgSet = new HashSet<Message>(
- responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f);
+
+ outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap<Message,Boolean>(
+ responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
+
// Response was success so invoke the callback's operationFinished
// method.
pubSubData.callback.operationFinished(pubSubData.context, null);