You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/04/24 19:37:55 UTC

svn commit: r1329883 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/api/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-server...

Author: ivank
Date: Tue Apr 24 17:37:54 2012
New Revision: 1329883

URL: http://svn.apache.org/viewvc?rev=1329883&view=rev
Log:
BOOKKEEPER-56: Race condition of message handler in connection recovery in Hedwig client (sijie & Gavin Li via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Apr 24 17:37:54 2012
@@ -102,6 +102,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-197: HedwigConsole uses the same file to load bookkeeper client config and hub server config (sijie)
 
+        BOOKKEEPER-56: Race condition of message handler in connection recovery in Hedwig client (sijie & Gavin Li via ivank)
+
       bookkeeper-benchmark/
 	
 	BOOKKEEPER-207: BenchBookie doesn't run correctly (ivank via fpj)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Tue Apr 24 17:37:54 2012
@@ -20,6 +20,7 @@ package org.apache.hedwig.client.api;
 import java.util.List;
 
 import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
 import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
 import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
@@ -232,9 +233,11 @@ public interface Subscriber {
      *            Message Handler that will consume the subscribed messages
      * @throws ClientNotSubscribedException
      *             If the client is not currently subscribed to the topic
+     * @throws AlreadyStartDeliveryException
+     *             If someone started delivery a message handler before stopping existed one.
      */
     public void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler messageHandler)
-            throws ClientNotSubscribedException;
+            throws ClientNotSubscribedException, AlreadyStartDeliveryException;
 
     /**
      * Stop delivery of messages for this topic and subscriberId.

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java Tue Apr 24 17:37:54 2012
@@ -25,9 +25,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
 import org.apache.hedwig.client.netty.HedwigClientImpl;
 import org.apache.hedwig.client.netty.HedwigSubscriber;
 import org.apache.hedwig.exceptions.PubSubException;
+
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.util.Callback;
 
@@ -48,15 +50,13 @@ public class SubscribeReconnectCallback 
     private final HedwigClientImpl client;
     private final HedwigSubscriber sub;
     private final ClientConfiguration cfg;
-    private final MessageHandler messageHandler;
 
     // Constructor
-    public SubscribeReconnectCallback(PubSubData origSubData, HedwigClientImpl client, MessageHandler messageHandler) {
+    public SubscribeReconnectCallback(PubSubData origSubData, HedwigClientImpl client) {
         this.origSubData = origSubData;
         this.client = client;
         this.sub = client.getSubscriber();
         this.cfg = client.getConfiguration();
-        this.messageHandler = messageHandler;
     }
 
     class SubscribeReconnectRetryTask extends TimerTask {
@@ -77,17 +77,17 @@ public class SubscribeReconnectCallback 
         // Now we want to restart delivery for the subscription channel only
         // if delivery was started at the time the original subscribe channel
         // was disconnected.
-        if (messageHandler != null) {
-            try {
-                sub.startDelivery(origSubData.topic, origSubData.subscriberId, messageHandler);
-            } catch (ClientNotSubscribedException e) {
-                // This exception should never be thrown here but just in case,
-                // log an error and just keep retrying the subscribe request.
-                logger.error("Subscribe was successful but error starting delivery for topic: "
-                             + origSubData.topic.toStringUtf8() + ", subscriberId: "
-                             + origSubData.subscriberId.toStringUtf8(), e);
-                retrySubscribeRequest();
-            }
+        try {
+            sub.restartDelivery(origSubData.topic, origSubData.subscriberId);
+        } catch (ClientNotSubscribedException e) {
+            // This exception should never be thrown here but just in case,
+            // log an error and just keep retrying the subscribe request.
+            logger.error("Subscribe was successful but error starting delivery for topic: "
+                         + origSubData.topic.toStringUtf8() + ", subscriberId: "
+                         + origSubData.subscriberId.toStringUtf8(), e);
+            retrySubscribeRequest();
+        } catch (AlreadyStartDeliveryException asde) {
+            // should not reach here
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Tue Apr 24 17:37:54 2012
@@ -34,6 +34,7 @@ import org.apache.hedwig.client.api.Subs
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
 import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
 import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
 import org.apache.hedwig.client.handlers.PubSubCallback;
 import org.apache.hedwig.exceptions.PubSubException;
@@ -68,6 +69,11 @@ public class HedwigSubscriber implements
     // Channel Pipeline.
     protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
 
+    // Concurrent Map to store Message handler for each topic + sub id combination.
+    // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
+    // user set when connection is recovered
+    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler= new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
+
     protected final HedwigClientImpl client;
     protected final ClientConfiguration cfg;
 
@@ -463,7 +469,18 @@ public class HedwigSubscriber implements
     }
 
     public void startDelivery(final ByteString topic, final ByteString subscriberId, MessageHandler messageHandler)
-            throws ClientNotSubscribedException {
+            throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        startDelivery(topic, subscriberId, messageHandler, false);
+    }
+
+    public void restartDelivery(final ByteString topic, final ByteString subscriberId)
+        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        startDelivery(topic, subscriberId, null, true);
+    }
+
+    private void startDelivery(final ByteString topic, final ByteString subscriberId,
+                               MessageHandler messageHandler, boolean restart)
+        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
         if (logger.isDebugEnabled())
             logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
                          + subscriberId.toStringUtf8());
@@ -482,6 +499,25 @@ public class HedwigSubscriber implements
         // Register the MessageHandler with the subscribe Channel's
         // Response Handler.
         Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+
+        // Need to ensure the setting of handler and the readability of channel is in sync
+        // as there's a race condition that connection recovery and user might call this at the same time
+
+        MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
+        if (restart) {
+            // restart using existing msg handler 
+            messageHandler = existedMsgHandler;
+        } else {
+            // some has started delivery but not stop it
+            if (null != existedMsgHandler) {
+                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
+            }
+            if (messageHandler != null) {
+                if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
+                    throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
+                }
+            }
+        }
         HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
         .setMessageHandler(messageHandler);
         // Now make the TopicSubscriber Channel readable (it is set to not be
@@ -521,6 +557,7 @@ public class HedwigSubscriber implements
         Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
         HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
         .setMessageHandler(null);
+        this.topicSubscriber2MessageHandler.remove(topicSubscriber);
         // Now make the TopicSubscriber channel not-readable. This will buffer
         // up messages if any are sent from the server. Note that this is an
         // asynchronous call. If this fails (not likely), the futureListener

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Tue Apr 24 17:37:54 2012
@@ -318,7 +318,7 @@ public class ResponseHandler extends Sim
             // hook so after the subscribe reconnect has completed, delivery for
             // that topic subscriber should also be restarted (if it was that
             // case before the channel disconnect).
-            origSubData.callback = new SubscribeReconnectCallback(origSubData, client, subHandler.getMessageHandler());
+            origSubData.callback = new SubscribeReconnectCallback(origSubData, client);
             origSubData.context = null;
             if (logger.isDebugEnabled())
                 logger.debug("Disconnected subscribe channel so reconnect with origSubData: " + origSubData);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Tue Apr 24 17:37:54 2012
@@ -52,6 +52,7 @@ public class HedwigProxy {
     ChannelGroup allChannels;
     Map<OperationType, Handler> handlers;
     ProxyConfiguration cfg;
+    ChannelTracker tracker;
 
     public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler)
             throws InterruptedException {
@@ -87,9 +88,14 @@ public class HedwigProxy {
         this(conf, new TerminateJVMExceptionHandler());
     }
 
+    // used for testing
+    public ChannelTracker getChannelTracker() {
+        return tracker;
+    }
+
     protected void initializeHandlers() {
         handlers = new HashMap<OperationType, Handler>();
-        ChannelTracker tracker = new ChannelTracker(client.getSubscriber());
+        tracker = new ChannelTracker(client.getSubscriber());
 
         handlers.put(OperationType.PUBLISH, new ProxyPublishHander(client.getPublisher()));
         handlers.put(OperationType.SUBSCRIBE, new ProxySubscribeHandler(client.getSubscriber(), tracker));
@@ -130,6 +136,10 @@ public class HedwigProxy {
         return handlers.get(OperationType.START_DELIVERY);
     }
 
+    public Handler getStopDeliveryHandler() {
+        return handlers.get(OperationType.STOP_DELIVERY);
+    }
+
     /**
      * @param args
      */

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java Tue Apr 24 17:37:54 2012
@@ -26,6 +26,7 @@ import org.jboss.netty.channel.ChannelFu
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -119,10 +120,11 @@ public class ProxyStartDeliveryHandler i
                 // channel and so on
                 logger.error("Unexpected: No subscription when attempting to start delivery", e);
                 throw new RuntimeException(e);
+            } catch (AlreadyStartDeliveryException e) {
+                logger.error("Unexpected: Already start delivery when attempting to start delivery", e);
+                throw new RuntimeException(e);
             }
 
-
-
         }
 
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java Tue Apr 24 17:37:54 2012
@@ -50,7 +50,7 @@ public class ProxyStopDeliveryHandler im
         }
 
         final ByteString topic = request.getTopic();
-        final ByteString subscriberId = request.getStartDeliveryRequest().getSubscriberId();
+        final ByteString subscriberId = request.getStopDeliveryRequest().getSubscriberId();
 
         synchronized (tracker) {
             try {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Tue Apr 24 17:37:54 2012
@@ -34,6 +34,7 @@ import org.apache.zookeeper.ZooKeeper;
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
 import org.apache.hedwig.client.netty.HedwigSubscriber;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -243,6 +244,10 @@ public class RegionManager implements Su
                         LOGGER.error(
                                 "[" + myRegion.toStringUtf8() + "] cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
                     mcb.operationFailed(ctx, ex);
+                } catch (AlreadyStartDeliveryException ex) {
+                    LOGGER.error("[" + myRegion.toStringUtf8() + "] cross-region start-delivery failed for topic "
+                               + topic.toStringUtf8(), ex);
+                    mcb.operationFailed(ctx, new PubSubException.UnexpectedConditionException("cross-region start-delivery failed : " + ex.getMessage()));
                 }
             }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1329883&r1=1329882&r2=1329883&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Tue Apr 24 17:37:54 2012
@@ -35,6 +35,7 @@ import org.apache.hedwig.client.api.Mess
 import org.apache.hedwig.client.api.Subscriber;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
 import org.apache.hedwig.client.HedwigClient;
 import org.apache.hedwig.client.api.Client;
 import org.apache.hedwig.client.api.Publisher;
@@ -48,6 +49,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.StopDeliveryRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
@@ -254,6 +256,20 @@ public class TestHedwigHub extends Hedwi
         }
     }
 
+    protected void stopDelivery(ByteString topic, ByteString subscriberId) throws Exception {
+        stopDelivery(subscriber, topic, subscriberId);
+    }
+
+    protected void stopDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId) throws Exception {
+        subscriber.stopDelivery(topic, subscriberId);
+        if (mode == Mode.PROXY) {
+            PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
+                                    .setTopic(topic).setTxnId(1).setType(OperationType.STOP_DELIVERY).setStopDeliveryRequest(
+                                        StopDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
+            proxy.getStopDeliveryHandler().handleRequest(request, proxy.getChannelTracker().getChannel(topic, subscriberId));
+        }
+    }
+
     protected void publishBatch(int batchSize, boolean expected, boolean messagesToBeConsumed, int loop) throws Exception {
         if (logger.isDebugEnabled())
             logger.debug("Publishing " + loop + " batch of messages.");
@@ -487,6 +503,20 @@ public class TestHedwigHub extends Hedwi
     }
 
     @Test
+    public void testStartDeliveryTwice() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+                                  null);
+        assertTrue(queue.take());
+        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        try {
+            startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+            fail("Should not reach here!");
+        } catch (AlreadyStartDeliveryException e) {
+        }
+    }
+
+    @Test
     public void testStopDelivery() throws Exception {
         ByteString topic = getTopic(0);
         subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
@@ -497,7 +527,7 @@ public class TestHedwigHub extends Hedwi
         assertTrue(queue.take());
         assertTrue(consumeQueue.take());
         // Stop the delivery for this subscription
-        subscriber.stopDelivery(topic, localSubscriberId);
+        stopDelivery(topic, localSubscriberId);
         // Publish some more messages so they are queued up to be delivered to
         // the client
         int batchSize = 10;