You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/09/13 15:52:24 UTC
svn commit: r1384336 - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/
hedwig-server/src/main/java/org/apache/hedwig/server/common/
Author: sijie
Date: Thu Sep 13 13:52:24 2012
New Revision: 1384336
URL: http://svn.apache.org/viewvc?rev=1384336&view=rev
Log:
BOOKKEEPER-392: Racey ConcurrentMap usage in java hedwig-client (Stu Hood via sijie)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Sep 13 13:52:24 2012
@@ -96,6 +96,8 @@ Trunk (unreleased changes)
BOOKKEEPER-371: NPE in hedwig hub client causes hedwig hub to shut down. (Aniruddha via sijie)
+ BOOKKEEPER-392: Racey ConcurrentMap usage in java hedwig-client (Stu Hood via sijie)
+
hedwig-server:
BOOKKEEPER-302: No more messages delivered when hub server scans messages over two ledgers. (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Thu Sep 13 13:52:24 2012
@@ -314,7 +314,7 @@ public class SubscribeResponseHandler {
// outstanding message limit was reached. For now, only turn the
// delivery back on if there are no more outstanding messages to
// consume. We could make this a configurable parameter if needed.
- if (!subscribeChannel.isReadable() && outstandingMsgSet.size() == 0) {
+ if (!subscribeChannel.isReadable() && outstandingMsgSet.isEmpty()) {
if (logger.isDebugEnabled())
logger
.debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: "
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Thu Sep 13 13:52:24 2012
@@ -18,11 +18,10 @@
package org.apache.hedwig.client.netty;
import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
@@ -76,7 +75,8 @@ public class HedwigClientImpl implements
// also want to remove all topic mappings the host was responsible for.
// The second Map is used as the inverted version of the first one.
protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
- private final ConcurrentMap<InetSocketAddress, List<ByteString>> host2Topics = new ConcurrentHashMap<InetSocketAddress, List<ByteString>>();
+ private final ConcurrentMap<InetSocketAddress, ConcurrentLinkedQueue<ByteString>> host2Topics =
+ new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<ByteString>>();
// Each client instantiation will have a Timer for running recurring
// threads. One such timer task thread to is to timeout long running
@@ -292,23 +292,27 @@ public class HedwigClientImpl implements
// server statuses, we consider that as a successful connection to the
// correct topic master.
InetSocketAddress host = getHostFromChannel(channel);
- if (topic2Host.containsKey(pubSubData.topic) && topic2Host.get(pubSubData.topic).equals(host)) {
+ InetSocketAddress existingHost = topic2Host.get(pubSubData.topic);
+ if (existingHost != null && existingHost.equals(host)) {
// Entry in map exists for the topic but it is the same as the
// current host. In this case there is nothing to do.
return;
}
// Store the relevant mappings for this topic and host combination.
- if (logger.isDebugEnabled())
- logger.debug("Storing info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
- + topic2Host.get(pubSubData.topic) + ", new host: " + host);
- topic2Host.put(pubSubData.topic, host);
- if (host2Topics.containsKey(host)) {
- host2Topics.get(host).add(pubSubData.topic);
- } else {
- LinkedList<ByteString> topicsList = new LinkedList<ByteString>();
- topicsList.add(pubSubData.topic);
- host2Topics.put(host, topicsList);
+ if (topic2Host.putIfAbsent(pubSubData.topic, host) == null) {
+ if (logger.isDebugEnabled())
+ logger.debug("Stored info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
+ + existingHost + ", new host: " + host);
+ ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
+ if (topicsForHost == null) {
+ ConcurrentLinkedQueue<ByteString> newTopicsList = new ConcurrentLinkedQueue<ByteString>();
+ topicsForHost = host2Topics.putIfAbsent(host, newTopicsList);
+ if (topicsForHost == null) {
+ topicsForHost = newTopicsList;
+ }
+ }
+ topicsForHost.add(pubSubData.topic);
}
}
@@ -360,14 +364,15 @@ public class HedwigClientImpl implements
logger.debug("Clearing all topics for host: " + host);
// For each of the topics that the host was responsible for,
// remove it from the topic2Host mapping.
- if (host2Topics.containsKey(host)) {
- for (ByteString topic : host2Topics.get(host)) {
+ ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
+ if (topicsForHost != null) {
+ for (ByteString topic : topicsForHost) {
if (logger.isDebugEnabled())
logger.debug("Removing mapping for topic: " + topic.toStringUtf8() + " from host: " + host);
- topic2Host.remove(topic);
+ topic2Host.remove(topic, host);
}
// Now it is safe to remove the host2Topics mapping entry.
- host2Topics.remove(host);
+ host2Topics.remove(host, topicsForHost);
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Thu Sep 13 13:52:24 2012
@@ -135,14 +135,15 @@ public class HedwigPublisher implements
// Check if we already have a Channel connection set up to the server
// for the given Topic.
PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, callback, context);
- if (client.topic2Host.containsKey(topic)) {
- InetSocketAddress host = client.topic2Host.get(topic);
- if (host2Channel.containsKey(host)) {
+ InetSocketAddress host = client.topic2Host.get(topic);
+ if (host != null) {
+ Channel channel = host2Channel.get(host);
+ if (channel != null) {
// We already have the Channel connection for the server host so
// do the publish directly. We will deal with redirect logic
// later on if that server is no longer the current host for
// the topic.
- doPublish(pubSubData, host2Channel.get(host));
+ doPublish(pubSubData, channel);
} else {
// We have a mapping for the topic to host but don't have a
// Channel for that server. This can happen if the Channel
@@ -155,10 +156,11 @@ public class HedwigPublisher implements
// default server host/port as defined in the configs. This should
// point to the server VIP which would redirect to a random server
// (which might not be the server hosting the topic).
- InetSocketAddress host = cfg.getDefaultServerHost();
- if (host2Channel.containsKey(host)) {
+ host = cfg.getDefaultServerHost();
+ Channel channel = host2Channel.get(host);
+ if (channel != null) {
// if there is a channel to default server, use it!
- doPublish(pubSubData, host2Channel.get(host));
+ doPublish(pubSubData, channel);
return;
}
client.doConnect(pubSubData, host);
@@ -226,10 +228,9 @@ public class HedwigPublisher implements
// RemoteAddress tied to it.
protected synchronized void storeHost2ChannelMapping(Channel channel) {
InetSocketAddress host = HedwigClientImpl.getHostFromChannel(channel);
- if (!closed && !host2Channel.containsKey(host)) {
+ if (!closed && host2Channel.putIfAbsent(host, channel) == null) {
if (logger.isDebugEnabled())
- logger.debug("Storing a new Channel mapping for host: " + host);
- host2Channel.put(host, channel);
+ logger.debug("Stored a new Channel mapping for host: " + host);
} else {
// If we've reached here, that means we already have a Channel
// mapping for the given host. This should ideally not happen
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Thu Sep 13 13:52:24 2012
@@ -84,7 +84,7 @@ public class HedwigSubscriber implements
protected final HedwigClientImpl client;
protected final ClientConfiguration cfg;
- private Object closeLock = new Object();
+ private final Object closeLock = new Object();
private boolean closed = false;
public HedwigSubscriber(HedwigClientImpl client) {
@@ -175,14 +175,17 @@ public class HedwigSubscriber implements
// subscribing to.
PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, callback,
context);
- if (client.topic2Host.containsKey(topic)) {
- InetSocketAddress host = client.topic2Host.get(topic);
- if (operationType.equals(OperationType.UNSUBSCRIBE) && client.getPublisher().host2Channel.containsKey(host)) {
+
+ InetSocketAddress host = client.topic2Host.get(topic);
+ if (host != null) {
+ Channel existingChannel = null;
+ if (operationType.equals(OperationType.UNSUBSCRIBE) &&
+ (existingChannel = client.getPublisher().host2Channel.get(host)) != null) {
// For unsubscribes, we can reuse the channel connections to the
// server host that are cached for publishes. For publish and
// unsubscribe flows, we will thus use the same Channels and
// will cache and store them during the ConnectCallback.
- doSubUnsub(pubSubData, client.getPublisher().host2Channel.get(host));
+ doSubUnsub(pubSubData, existingChannel);
} else {
// We know which server host is the master for the topic so
// connect to that first. For subscribes, we want a new channel
@@ -348,7 +351,8 @@ public class HedwigSubscriber implements
+ subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
// Check that this topic subscription on the client side exists.
- if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+ Channel channel = topicSubscriber2Channel.get(topicSubscriber);
+ if (channel == null) {
throw new ClientNotSubscribedException(
"Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
+ ", subscriberId: " + subscriberId.toStringUtf8());
@@ -356,7 +360,7 @@ public class HedwigSubscriber implements
PubSubData pubSubData = new PubSubData(topic, null, subscriberId, OperationType.CONSUME, null, null, null);
// Send the consume message to the server using the same subscribe
// channel that the topic subscription uses.
- doConsume(pubSubData, topicSubscriber2Channel.get(topicSubscriber), messageSeqId);
+ doConsume(pubSubData, channel, messageSeqId);
}
/**
@@ -586,20 +590,16 @@ public class HedwigSubscriber implements
// exists. The assumption is that the client should have in memory the
// Channel created for the TopicSubscriber once the server has sent
// an ack response to the initial subscribe request.
- if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+ Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+ if (topicSubscriberChannel == null) {
logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
+ subscriberId.toStringUtf8());
throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
+ ", subscriberId: " + subscriberId.toStringUtf8());
}
- // Register the MessageHandler with the subscribe Channel's
- // Response Handler.
- Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
-
// Need to ensure the setting of handler and the readability of channel is in sync
// as there's a race condition that connection recovery and user might call this at the same time
-
MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
if (restart) {
// restart using existing msg handler
@@ -620,7 +620,7 @@ public class HedwigSubscriber implements
.setMessageHandler(messageHandler);
} catch (NoResponseHandlerException e) {
// We did not find a response handler. So remove this subscription handler and throw an exception.
- topicSubscriber2MessageHandler.remove(topicSubscriber);
+ topicSubscriber2MessageHandler.remove(topicSubscriber, existedMsgHandler);
asyncCloseSubscription(topic, subscriberId, new Callback<Void>() {
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
@@ -664,7 +664,8 @@ public class HedwigSubscriber implements
// exists. The assumption is that the client should have in memory the
// Channel created for the TopicSubscriber once the server has sent
// an ack response to the initial subscribe request.
- if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+ Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+ if (topicSubscriberChannel == null) {
logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
+ subscriberId.toStringUtf8());
throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
@@ -673,7 +674,6 @@ public class HedwigSubscriber implements
// Unregister the MessageHandler for the subscribe Channel's
// Response Handler.
- Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
try {
HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
.setMessageHandler(null);
@@ -729,10 +729,9 @@ public class HedwigSubscriber implements
logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ subscriberId.toStringUtf8());
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- if (topicSubscriber2Channel.containsKey(topicSubscriber)) {
- // Remove all cached references for the TopicSubscriber
- Channel channel = topicSubscriber2Channel.get(topicSubscriber);
- topicSubscriber2Channel.remove(topicSubscriber);
+ // Remove all cached references for the TopicSubscriber
+ Channel channel = topicSubscriber2Channel.remove(topicSubscriber);
+ if (channel != null) {
// Close the subscribe channel asynchronously.
try {
HedwigClientImpl.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
@@ -779,6 +778,7 @@ public class HedwigSubscriber implements
}
Channel oldc = topicSubscriber2Channel.putIfAbsent(topic, channel);
if (oldc != null) {
+ logger.warn("Dropping new channel for " + topic + ", due to existing channel: " + oldc);
channel.close();
}
if (null != preferences) {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Thu Sep 13 13:52:24 2012
@@ -127,9 +127,8 @@ public class ResponseHandler extends Sim
// Response is an ack to a prior PubSubRequest so first retrieve the
// PubSub data for this txn.
- PubSubData pubSubData = txn2PubSubData.containsKey(response.getTxnId()) ? txn2PubSubData.get(response
- .getTxnId()) : null;
- // Validate that the PubSub data for this txn is stored. If not, just
+ PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId());
+ // Validate that the PubSub data for this txn was stored. If not, just
// log an error message and return since we don't know how to handle
// this.
if (pubSubData == null) {
@@ -137,10 +136,6 @@ public class ResponseHandler extends Sim
return;
}
- // Now that we've retrieved the PubSubData for this specific Txn ID, we
- // can remove it from the Map.
- txn2PubSubData.remove(response.getTxnId());
-
// Store the topic2Host mapping if this wasn't a server redirect. We'll
// assume that if the server was able to have an open Channel connection
// to the client, and responded with an ack message other than the
@@ -242,8 +237,8 @@ public class ResponseHandler extends Sim
// Check if we already have a Channel open to the redirected server
// host.
- boolean redirectedHostChannelExists = pub.host2Channel.containsKey(redirectedHost) ? true : false;
- if (pubSubData.operationType.equals(OperationType.SUBSCRIBE) || !redirectedHostChannelExists) {
+ Channel redirectedHostChannel = pub.host2Channel.get(redirectedHost);
+ if (pubSubData.operationType.equals(OperationType.SUBSCRIBE) || redirectedHostChannel == null) {
// We don't have an existing channel to the redirected host OR this
// is a redirected Subscribe request. For Subscribe requests, we
// always want to create a new unique Channel connection to the
@@ -254,9 +249,9 @@ public class ResponseHandler extends Sim
// request again directly on the existing cached redirected host
// channel.
if (pubSubData.operationType.equals(OperationType.PUBLISH)) {
- pub.doPublish(pubSubData, pub.host2Channel.get(redirectedHost));
+ pub.doPublish(pubSubData, redirectedHostChannel);
} else if (pubSubData.operationType.equals(OperationType.UNSUBSCRIBE)) {
- sub.doSubUnsub(pubSubData, pub.host2Channel.get(redirectedHost));
+ sub.doSubUnsub(pubSubData, redirectedHostChannel);
}
}
}
@@ -295,12 +290,14 @@ public class ResponseHandler extends Sim
// Due to race concurrency situations, it is possible to
// create multiple channels to the same host for publish
// and unsubscribe requests.
- if (pub.host2Channel.containsKey(host) && pub.host2Channel.get(host).equals(ctx.getChannel())) {
+ Channel channel = pub.host2Channel.get(host);
+ if (channel != null && channel.equals(ctx.getChannel())) {
if (logger.isDebugEnabled())
logger.debug("Disconnected channel for host: " + host
+ " was for Publish/Unsubscribe requests so remove all references to it.");
- pub.host2Channel.remove(host);
- client.clearAllTopicsForHost(host);
+ if (pub.host2Channel.remove(host, channel)) {
+ client.clearAllTopicsForHost(host);
+ }
}
} else {
// Subscribe channel disconnected so first close and clear all
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java Thu Sep 13 13:52:24 2012
@@ -26,20 +26,13 @@ public class ByteStringInterner {
// TODO: how to release references when strings are no longer used. weak
// references?
- private static ConcurrentMap<ByteString, ByteString> map = new ConcurrentHashMap<ByteString, ByteString>();
+ private static final ConcurrentMap<ByteString, ByteString> map = new ConcurrentHashMap<ByteString, ByteString>();
public static ByteString intern(ByteString in) {
- ByteString presentValueInMap = map.get(in);
+ ByteString presentValueInMap = map.putIfAbsent(in, in);
if (presentValueInMap != null) {
return presentValueInMap;
}
-
- presentValueInMap = map.putIfAbsent(in, in);
- if (presentValueInMap != null) {
- return presentValueInMap;
- }
-
return in;
-
}
}