You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/04/24 19:37:55 UTC
svn commit: r1329883 - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/api/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-server...
Author: ivank
Date: Tue Apr 24 17:37:54 2012
New Revision: 1329883
URL: http://svn.apache.org/viewvc?rev=1329883&view=rev
Log:
BOOKKEEPER-56: Race condition of message handler in connection recovery in Hedwig client (sijie & Gavin Li via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Apr 24 17:37:54 2012
@@ -102,6 +102,8 @@ Trunk (unreleased changes)
BOOKKEEPER-197: HedwigConsole uses the same file to load bookkeeper client config and hub server config (sijie)
+ BOOKKEEPER-56: Race condition of message handler in connection recovery in Hedwig client (sijie & Gavin Li via ivank)
+
bookkeeper-benchmark/
BOOKKEEPER-207: BenchBookie doesn't run correctly (ivank via fpj)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Tue Apr 24 17:37:54 2012
@@ -20,6 +20,7 @@ package org.apache.hedwig.client.api;
import java.util.List;
import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
@@ -232,9 +233,11 @@ public interface Subscriber {
* Message Handler that will consume the subscribed messages
* @throws ClientNotSubscribedException
* If the client is not currently subscribed to the topic
+ * @throws AlreadyStartDeliveryException
+ * If someone started delivery a message handler before stopping existed one.
*/
public void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler messageHandler)
- throws ClientNotSubscribedException;
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException;
/**
* Stop delivery of messages for this topic and subscriberId.
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java Tue Apr 24 17:37:54 2012
@@ -25,9 +25,11 @@ import org.slf4j.LoggerFactory;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.netty.HedwigClientImpl;
import org.apache.hedwig.client.netty.HedwigSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
+
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.util.Callback;
@@ -48,15 +50,13 @@ public class SubscribeReconnectCallback
private final HedwigClientImpl client;
private final HedwigSubscriber sub;
private final ClientConfiguration cfg;
- private final MessageHandler messageHandler;
// Constructor
- public SubscribeReconnectCallback(PubSubData origSubData, HedwigClientImpl client, MessageHandler messageHandler) {
+ public SubscribeReconnectCallback(PubSubData origSubData, HedwigClientImpl client) {
this.origSubData = origSubData;
this.client = client;
this.sub = client.getSubscriber();
this.cfg = client.getConfiguration();
- this.messageHandler = messageHandler;
}
class SubscribeReconnectRetryTask extends TimerTask {
@@ -77,17 +77,17 @@ public class SubscribeReconnectCallback
// Now we want to restart delivery for the subscription channel only
// if delivery was started at the time the original subscribe channel
// was disconnected.
- if (messageHandler != null) {
- try {
- sub.startDelivery(origSubData.topic, origSubData.subscriberId, messageHandler);
- } catch (ClientNotSubscribedException e) {
- // This exception should never be thrown here but just in case,
- // log an error and just keep retrying the subscribe request.
- logger.error("Subscribe was successful but error starting delivery for topic: "
- + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8(), e);
- retrySubscribeRequest();
- }
+ try {
+ sub.restartDelivery(origSubData.topic, origSubData.subscriberId);
+ } catch (ClientNotSubscribedException e) {
+ // This exception should never be thrown here but just in case,
+ // log an error and just keep retrying the subscribe request.
+ logger.error("Subscribe was successful but error starting delivery for topic: "
+ + origSubData.topic.toStringUtf8() + ", subscriberId: "
+ + origSubData.subscriberId.toStringUtf8(), e);
+ retrySubscribeRequest();
+ } catch (AlreadyStartDeliveryException asde) {
+ // should not reach here
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Tue Apr 24 17:37:54 2012
@@ -34,6 +34,7 @@ import org.apache.hedwig.client.api.Subs
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.exceptions.PubSubException;
@@ -68,6 +69,11 @@ public class HedwigSubscriber implements
// Channel Pipeline.
protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+ // Concurrent Map to store Message handler for each topic + sub id combination.
+ // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
+ // user set when connection is recovered
+ protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler= new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
+
protected final HedwigClientImpl client;
protected final ClientConfiguration cfg;
@@ -463,7 +469,18 @@ public class HedwigSubscriber implements
}
public void startDelivery(final ByteString topic, final ByteString subscriberId, MessageHandler messageHandler)
- throws ClientNotSubscribedException {
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ startDelivery(topic, subscriberId, messageHandler, false);
+ }
+
+ public void restartDelivery(final ByteString topic, final ByteString subscriberId)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ startDelivery(topic, subscriberId, null, true);
+ }
+
+ private void startDelivery(final ByteString topic, final ByteString subscriberId,
+ MessageHandler messageHandler, boolean restart)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
if (logger.isDebugEnabled())
logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ subscriberId.toStringUtf8());
@@ -482,6 +499,25 @@ public class HedwigSubscriber implements
// Register the MessageHandler with the subscribe Channel's
// Response Handler.
Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+
+ // Need to ensure the setting of handler and the readability of channel is in sync
+ // as there's a race condition that connection recovery and user might call this at the same time
+
+ MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
+ if (restart) {
+ // restart using existing msg handler
+ messageHandler = existedMsgHandler;
+ } else {
+ // some has started delivery but not stop it
+ if (null != existedMsgHandler) {
+ throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
+ }
+ if (messageHandler != null) {
+ if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
+ throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
+ }
+ }
+ }
HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
.setMessageHandler(messageHandler);
// Now make the TopicSubscriber Channel readable (it is set to not be
@@ -521,6 +557,7 @@ public class HedwigSubscriber implements
Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
.setMessageHandler(null);
+ this.topicSubscriber2MessageHandler.remove(topicSubscriber);
// Now make the TopicSubscriber channel not-readable. This will buffer
// up messages if any are sent from the server. Note that this is an
// asynchronous call. If this fails (not likely), the futureListener
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Tue Apr 24 17:37:54 2012
@@ -318,7 +318,7 @@ public class ResponseHandler extends Sim
// hook so after the subscribe reconnect has completed, delivery for
// that topic subscriber should also be restarted (if it was that
// case before the channel disconnect).
- origSubData.callback = new SubscribeReconnectCallback(origSubData, client, subHandler.getMessageHandler());
+ origSubData.callback = new SubscribeReconnectCallback(origSubData, client);
origSubData.context = null;
if (logger.isDebugEnabled())
logger.debug("Disconnected subscribe channel so reconnect with origSubData: " + origSubData);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Tue Apr 24 17:37:54 2012
@@ -52,6 +52,7 @@ public class HedwigProxy {
ChannelGroup allChannels;
Map<OperationType, Handler> handlers;
ProxyConfiguration cfg;
+ ChannelTracker tracker;
public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler)
throws InterruptedException {
@@ -87,9 +88,14 @@ public class HedwigProxy {
this(conf, new TerminateJVMExceptionHandler());
}
+ // used for testing
+ public ChannelTracker getChannelTracker() {
+ return tracker;
+ }
+
protected void initializeHandlers() {
handlers = new HashMap<OperationType, Handler>();
- ChannelTracker tracker = new ChannelTracker(client.getSubscriber());
+ tracker = new ChannelTracker(client.getSubscriber());
handlers.put(OperationType.PUBLISH, new ProxyPublishHander(client.getPublisher()));
handlers.put(OperationType.SUBSCRIBE, new ProxySubscribeHandler(client.getSubscriber(), tracker));
@@ -130,6 +136,10 @@ public class HedwigProxy {
return handlers.get(OperationType.START_DELIVERY);
}
+ public Handler getStopDeliveryHandler() {
+ return handlers.get(OperationType.STOP_DELIVERY);
+ }
+
/**
* @param args
*/
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java Tue Apr 24 17:37:54 2012
@@ -26,6 +26,7 @@ import org.jboss.netty.channel.ChannelFu
import com.google.protobuf.ByteString;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -119,10 +120,11 @@ public class ProxyStartDeliveryHandler i
// channel and so on
logger.error("Unexpected: No subscription when attempting to start delivery", e);
throw new RuntimeException(e);
+ } catch (AlreadyStartDeliveryException e) {
+ logger.error("Unexpected: Already start delivery when attempting to start delivery", e);
+ throw new RuntimeException(e);
}
-
-
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java Tue Apr 24 17:37:54 2012
@@ -50,7 +50,7 @@ public class ProxyStopDeliveryHandler im
}
final ByteString topic = request.getTopic();
- final ByteString subscriberId = request.getStartDeliveryRequest().getSubscriberId();
+ final ByteString subscriberId = request.getStopDeliveryRequest().getSubscriberId();
synchronized (tracker) {
try {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Tue Apr 24 17:37:54 2012
@@ -34,6 +34,7 @@ import org.apache.zookeeper.ZooKeeper;
import com.google.protobuf.ByteString;
import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.netty.HedwigSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -243,6 +244,10 @@ public class RegionManager implements Su
LOGGER.error(
"[" + myRegion.toStringUtf8() + "] cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
mcb.operationFailed(ctx, ex);
+ } catch (AlreadyStartDeliveryException ex) {
+ LOGGER.error("[" + myRegion.toStringUtf8() + "] cross-region start-delivery failed for topic "
+ + topic.toStringUtf8(), ex);
+ mcb.operationFailed(ctx, new PubSubException.UnexpectedConditionException("cross-region start-delivery failed : " + ex.getMessage()));
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Tue Apr 24 17:37:54 2012
@@ -35,6 +35,7 @@ import org.apache.hedwig.client.api.Mess
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.HedwigClient;
import org.apache.hedwig.client.api.Client;
import org.apache.hedwig.client.api.Publisher;
@@ -48,6 +49,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.StopDeliveryRequest;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
@@ -254,6 +256,20 @@ public class TestHedwigHub extends Hedwi
}
}
+ protected void stopDelivery(ByteString topic, ByteString subscriberId) throws Exception {
+ stopDelivery(subscriber, topic, subscriberId);
+ }
+
+ protected void stopDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId) throws Exception {
+ subscriber.stopDelivery(topic, subscriberId);
+ if (mode == Mode.PROXY) {
+ PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
+ .setTopic(topic).setTxnId(1).setType(OperationType.STOP_DELIVERY).setStopDeliveryRequest(
+ StopDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
+ proxy.getStopDeliveryHandler().handleRequest(request, proxy.getChannelTracker().getChannel(topic, subscriberId));
+ }
+ }
+
protected void publishBatch(int batchSize, boolean expected, boolean messagesToBeConsumed, int loop) throws Exception {
if (logger.isDebugEnabled())
logger.debug("Publishing " + loop + " batch of messages.");
@@ -487,6 +503,20 @@ public class TestHedwigHub extends Hedwi
}
@Test
+ public void testStartDeliveryTwice() throws Exception {
+ ByteString topic = getTopic(0);
+ subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+ null);
+ assertTrue(queue.take());
+ startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+ try {
+ startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+ fail("Should not reach here!");
+ } catch (AlreadyStartDeliveryException e) {
+ }
+ }
+
+ @Test
public void testStopDelivery() throws Exception {
ByteString topic = getTopic(0);
subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
@@ -497,7 +527,7 @@ public class TestHedwigHub extends Hedwi
assertTrue(queue.take());
assertTrue(consumeQueue.take());
// Stop the delivery for this subscription
- subscriber.stopDelivery(topic, localSubscriberId);
+ stopDelivery(topic, localSubscriberId);
// Publish some more messages so they are queued up to be delivered to
// the client
int batchSize = 10;