You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/14 18:16:18 UTC

svn commit: r1384836 [1/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/test/ hedwig-client/src/main/java/org/apache/hedwig/client/api/ hedwig-client/src/main/jav...

Author: ivank
Date: Fri Sep 14 16:16:17 2012
New Revision: 1384836

URL: http://svn.apache.org/viewvc?rev=1384836&view=rev
Log:
BOOKKEEPER-252: Hedwig: provide a subscription mode to kill other subscription channel when hedwig client is used as a proxy-style server. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Sep 14 16:16:17 2012
@@ -158,6 +158,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-333: server-side message filter (sijie via ivank)
 
+        BOOKKEEPER-252: Hedwig: provide a subscription mode to kill other subscription channel when hedwig client is used as a proxy-style server. (sijie via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h Fri Sep 14 16:16:17 2012
@@ -30,6 +30,18 @@
 
 namespace Hedwig {
 
+  //
+  // A Listener registered for a Subscriber instance to emit events
+  // for those disable resubscribe subscriptions.
+  //
+  class SubscriptionListener {
+  public:
+    virtual void processEvent(const std::string &topic, const std::string &subscriberId,
+                              const Hedwig::SubscriptionEvent event) = 0;
+    virtual ~SubscriptionListener() {};
+  };
+  typedef std::tr1::shared_ptr<SubscriptionListener> SubscriptionListenerPtr;
+
   template<class R>
   class Callback {
   public:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h Fri Sep 14 16:16:17 2012
@@ -36,7 +36,7 @@ namespace Hedwig {
     virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) = 0;
     virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) = 0;
     virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) = 0;
-    
+
     virtual void unsubscribe(const std::string& topic, const std::string& subscriberId) = 0;
     virtual void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) = 0;  
 
@@ -53,6 +53,14 @@ namespace Hedwig {
 
     virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;
 
+    //
+    // API to register/unregister subscription listeners for receiving
+    // events indicating subscription changes for those disable resubscribe
+    // subscriptions
+    //
+    virtual void addSubscriptionListener(SubscriptionListenerPtr& listener) = 0;
+    virtual void removeSubscriptionListener(SubscriptionListenerPtr& listener) = 0;
+
     virtual ~Subscriber() {}
   };
 };

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp Fri Sep 14 16:16:17 2012
@@ -134,6 +134,7 @@ const PubSubRequestPtr PubSubData::getRe
     Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
     subreq->set_subscriberid(subscriberid);
     subreq->set_createorattach(options.createorattach());
+    subreq->set_forceattach(options.forceattach());
     setPreferencesForSubRequest(subreq, options);
   } else if (type == CONSUME) {
     LOG4CXX_DEBUG(logger, "Creating consume request");

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Fri Sep 14 16:16:17 2012
@@ -147,7 +147,8 @@ void SubscriberReconnectCallback::operat
 }
 
 SubscriberClientChannelHandler::SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
-  : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false), should_wait(true)  {
+  : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false),
+    should_wait(true), disconnected(false) {
   LOG4CXX_DEBUG(logger, "Creating SubscriberClientChannelHandler " << this);
 }
 
@@ -178,7 +179,7 @@ void SubscriberClientChannelHandler::mes
 void SubscriberClientChannelHandler::close() {
   closed = true;
 
-  if (channel) {
+  if (channel.get()) {
     channel->kill();
   }
 }
@@ -190,7 +191,7 @@ void SubscriberClientChannelHandler::clo
     return;
   }
   handler->should_wait = false;
-  handler->channelDisconnected(channel, e);
+  handler->reconnect(channel, e);
 }
 
 void SubscriberClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) {
@@ -205,12 +206,40 @@ void SubscriberClientChannelHandler::cha
     return;
   }
 
+  {
+    boost::shared_lock<boost::shared_mutex> lock(disconnected_lock);
+    // some one is reconnecting return
+    if (disconnected) {
+      return;
+    }
+    disconnected = true;
+  }
+
+  // if we have registered the subscription channel listener, disable retry
+  // just trigger listener to let application handle channel disconnected event
+  if (!origData->getSubscriptionOptions().enableresubscribe()) {
+    LOG4CXX_INFO(logger, "Tell subscriber (topic:" << origData->getTopic()
+                         << ", subscriberId:" << origData->getSubscriberId()
+                         << ") his topic has been moved : channel "
+                         << channel.get() << " is disconnected");
+    // remove record of the failed channel from the subscriber
+    client->getSubscriberImpl().closeSubscription(origData->getTopic(), origData->getSubscriberId());
+    // emit the event to notify the client that topic has been moved
+    client->getSubscriberImpl().emitSubscriptionEvent(
+      origData->getTopic(), origData->getSubscriberId(), TOPIC_MOVED);
+  } else {
+    reconnect(channel, e);
+  }
+}
+
+void SubscriberClientChannelHandler::reconnect(const DuplexChannelPtr& channel, const std::exception& e) {
   if (should_wait) {
     int retrywait = client->getConfiguration().getInt(Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME,
 						      DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME);
     
-    boost::asio::deadline_timer t(channel->getService(), boost::posix_time::milliseconds(retrywait));
-    t.async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(), 
+    // set reconnect timer
+    reconnectTimer = ReconnectTimerPtr(new boost::asio::deadline_timer(channel->getService(), boost::posix_time::milliseconds(retrywait)));
+    reconnectTimer->async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(),
 			     channel, e, boost::asio::placeholders::error));  
     return;
   }
@@ -387,9 +416,12 @@ void SubscriberImpl::doUnsubscribe(const
 
 void SubscriberImpl::consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) {
   TopicSubscriber t(topic, subscriberId);
-  
-  boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
-  SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+
+  SubscriberClientChannelHandlerPtr handler;
+  {
+    boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+    handler = topicsubscriber2handler[t];
+  }
 
   if (handler.get() == 0) {
     LOG4CXX_ERROR(logger, "Cannot consume. Bad handler for topic(" << topic << ") subscriberId(" << subscriberId << ") topicsubscriber2topic(" << &topicsubscriber2handler << ")");
@@ -399,6 +431,7 @@ void SubscriberImpl::consume(const std::
   DuplexChannelPtr channel = handler->getChannel();
   if (channel.get() == 0) {
     LOG4CXX_ERROR(logger, "Trying to consume a message on a topic/subscriber pair that don't have a channel. Something fishy going on. Topic: " << topic << " SubscriberId: " << subscriberId << " MessageSeqId: " << messageSeqId.localcomponent());
+    return;
   }
   
   PubSubDataPtr data = PubSubData::forConsumeRequest(client->counter().next(), subscriberId, topic, messageSeqId);  
@@ -430,23 +463,31 @@ void SubscriberImpl::startDelivery(const
                                    const MessageHandlerCallbackPtr& callback) {
   TopicSubscriber t(topic, subscriberId);
 
-  boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
-  SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+  SubscriberClientChannelHandlerPtr handler;
+  {
+    boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+    handler = topicsubscriber2handler[t];
+  }
 
   if (handler.get() == 0) {
     LOG4CXX_ERROR(logger, "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId);
+    throw NotSubscribedException();
   }
   handler->startDelivery(callback);
 }
 
 void SubscriberImpl::stopDelivery(const std::string& topic, const std::string& subscriberId) {
   TopicSubscriber t(topic, subscriberId);
-  
-  boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
-  SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+
+  SubscriberClientChannelHandlerPtr handler;
+  {
+    boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+    handler = topicsubscriber2handler[t];
+  }
 
   if (handler.get() == 0) {
-    LOG4CXX_ERROR(logger, "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId);
+    LOG4CXX_ERROR(logger, "Trying to stop deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId);
+    throw NotSubscribedException();
   }
   handler->stopDelivery();
 }
@@ -484,6 +525,29 @@ const SubscriptionPreferencesPtr& Subscr
   return preferences;
 }
 
+void SubscriberImpl::addSubscriptionListener(SubscriptionListenerPtr& listener) {
+  boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
+  listeners.insert(listener);
+}
+
+void SubscriberImpl::removeSubscriptionListener(SubscriptionListenerPtr& listener) {
+  boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
+  listeners.erase(listener);
+}
+
+void SubscriberImpl::emitSubscriptionEvent(const std::string& topic,
+                                           const std::string& subscriberId,
+                                           const SubscriptionEvent event) {
+  boost::shared_lock<boost::shared_mutex> lock(listeners_lock);
+  if (0 == listeners.size()) {
+    return;
+  }
+  for (SubscriptionListenerSet::iterator iter = listeners.begin();
+       iter != listeners.end(); ++iter) {
+    (*iter)->processEvent(topic, subscriberId, event);
+  }
+}
+
 /**
    takes ownership of txn
 */

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Fri Sep 14 16:16:17 2012
@@ -121,6 +121,8 @@ namespace Hedwig {
     void setChannel(const DuplexChannelPtr& channel);
     DuplexChannelPtr& getChannel();
 
+    void reconnect(const DuplexChannelPtr& channel, const std::exception& e);
+
     static void reconnectTimerComplete(const SubscriberClientChannelHandlerPtr handler, const DuplexChannelPtr channel, const std::exception e, 
 				       const boost::system::error_code& error);
 
@@ -135,7 +137,18 @@ namespace Hedwig {
     PubSubDataPtr origData;
     DuplexChannelPtr channel;
     bool closed;
+
+    boost::shared_mutex disconnected_lock;
     bool should_wait;
+    bool disconnected;
+    typedef boost::shared_ptr<boost::asio::deadline_timer> ReconnectTimerPtr;
+    ReconnectTimerPtr reconnectTimer;
+  };
+
+  struct SubscriptionListenerPtrHash : public std::unary_function<SubscriptionListenerPtr, size_t> {
+    size_t operator()(const Hedwig::SubscriptionListenerPtr& listener) const {
+      return reinterpret_cast<size_t>(listener.get());
+    }
   };
 
   class SubscriberImpl : public Subscriber {
@@ -167,6 +180,12 @@ namespace Hedwig {
     void doSubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data, const SubscriberClientChannelHandlerPtr& handler);
     void doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
 
+    virtual void addSubscriptionListener(SubscriptionListenerPtr& listener);
+    virtual void removeSubscriptionListener(SubscriptionListenerPtr& listener);
+    void emitSubscriptionEvent(const std::string& topic,
+                               const std::string& subscriberId,
+                               const SubscriptionEvent event);
+
   private:
     void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
                                     const SubscriptionPreferences& preferences);
@@ -180,6 +199,10 @@ namespace Hedwig {
     boost::shared_mutex topicsubscriber2handler_lock;	    
     std::tr1::unordered_map<TopicSubscriber, SubscriptionPreferencesPtr, TopicSubscriberHash> topicsubscriber2preferences;
     boost::shared_mutex topicsubscriber2preferences_lock;	    
+
+    typedef std::tr1::unordered_set<SubscriptionListenerPtr, SubscriptionListenerPtrHash> SubscriptionListenerSet;
+    SubscriptionListenerSet listeners;
+    boost::shared_mutex listeners_lock;
   };
 
 };

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp Fri Sep 14 16:16:17 2012
@@ -164,3 +164,54 @@ TEST(SubscribeTest, testSubscribeTwice) 
   ASSERT_THROW(sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH), Hedwig::AlreadySubscribedException);
 }
 
+TEST(SubscribeTest, testAsyncSubcribeForceAttach) {
+  Hedwig::Configuration* conf = new TestServerConfiguration();
+  std::auto_ptr<Hedwig::Configuration> confptr(conf);
+  // client 1
+  Hedwig::Client* client1 = new Hedwig::Client(*conf);
+  std::auto_ptr<Hedwig::Client> client1ptr(client1);
+  Hedwig::Subscriber& sub1 = client1->getSubscriber();
+  // client 2
+  Hedwig::Client* client2 = new Hedwig::Client(*conf);
+  std::auto_ptr<Hedwig::Client> client2ptr(client2);
+  Hedwig::Subscriber& sub2 = client2->getSubscriber();
+
+  SimpleWaitCondition* cond1 = new SimpleWaitCondition();
+  std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
+  Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
+
+  SimpleWaitCondition* lcond1 = new SimpleWaitCondition();
+  std::auto_ptr<SimpleWaitCondition> lcond1ptr(lcond1);
+  Hedwig::SubscriptionListenerPtr listener1(new TestSubscriptionListener(lcond1));
+
+  Hedwig::SubscriptionOptions options;
+  options.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+  options.set_forceattach(true);
+  options.set_enableresubscribe(false);
+
+  sub1.addSubscriptionListener(listener1);
+
+  sub1.asyncSubscribe("asyncSubscribeForceAttach", "mysub",
+                      options, testcb1);
+  cond1->wait();
+  ASSERT_TRUE(cond1->wasSuccess());
+
+  // sub2 subscribe would force close the channel of sub1
+  SimpleWaitCondition* cond2 = new SimpleWaitCondition();
+  std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2);
+  Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2));
+
+  Hedwig::SubscriptionListenerPtr listener2(new TestSubscriptionListener(0));
+
+  sub2.addSubscriptionListener(listener2);
+
+  sub2.asyncSubscribe("asyncSubscribeForceAttach", "mysub",
+                      options, testcb2);
+  cond2->wait();
+  ASSERT_TRUE(cond2->wasSuccess());
+
+  // sub1 would receive the disconnect event
+  lcond1->wait();
+
+  sub1.unsubscribe("asyncSubscribeForceAttach", "mysub");
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h Fri Sep 14 16:16:17 2012
@@ -113,6 +113,26 @@ private:
   SimpleWaitCondition *cond;
 };
 
+class TestSubscriptionListener : public Hedwig::SubscriptionListener {
+public:
+  TestSubscriptionListener(SimpleWaitCondition* cond) : cond(cond) {
+  }
+
+  virtual ~TestSubscriptionListener() {}
+
+  virtual void processEvent(const std::string& topic, const std::string& subscriberId,
+                            const Hedwig::SubscriptionEvent event) {
+    if (Hedwig::TOPIC_MOVED == event) {
+      if (cond) {
+        cond->setSuccess(true);
+        cond->notify();
+      }
+    }
+  }
+
+private:
+  SimpleWaitCondition *cond;
+};
 
 class TestServerConfiguration : public Hedwig::Configuration {
 public:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Fri Sep 14 16:16:17 2012
@@ -31,6 +31,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
 import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.SubscriptionListener;
 
 /**
  * Interface to define the client Subscriber API.
@@ -349,4 +350,21 @@ public interface Subscriber {
     public void asyncCloseSubscription(ByteString topic, ByteString subscriberId, Callback<Void> callback,
                                        Object context);
 
+    /**
+     * Register a subscription listener which get notified about subscription
+     * event indicating a state of a subscription that subscribed disable
+     * resubscribe logic.
+     *
+     * @param listener
+     *          Subscription Listener
+     */
+    public void addSubscriptionListener(SubscriptionListener listener);
+
+    /**
+     * Unregister a subscription listener.
+     *
+     * @param listener
+     *          Subscription Listener
+     */
+    public void removeSubscriptionListener(SubscriptionListener listener);
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Fri Sep 14 16:16:17 2012
@@ -54,6 +54,10 @@ public class HedwigClientImpl implements
 
     private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class);
 
+    // Empty Topic List
+    private ConcurrentLinkedQueue<ByteString> EMPTY_TOPIC_LIST =
+        new ConcurrentLinkedQueue<ByteString>();
+
     // Global counter used for generating unique transaction ID's for
     // publish and subscribe requests
     protected final AtomicLong globalCounter = new AtomicLong();
@@ -373,6 +377,31 @@ public class HedwigClientImpl implements
         }
     }
 
+    // If a subscribe channel goes down, the topic might have moved.
+    // We only clear out that topic for the host and not all cached information.
+    public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Clearing topic: " + topic.toStringUtf8() + " for host: "
+                    + host);
+        }
+        if (topic2Host.remove(topic, host)) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Removed topic to host mapping for topic: " + topic.toStringUtf8()
+                           + " and host: " + host);
+            }
+        }
+        ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
+        if (null != topicsForHost && topicsForHost.remove(topic)) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Removed topic: " + topic.toStringUtf8() + " from host: " + host);
+            }
+            if (topicsForHost.isEmpty()) {
+                // remove only topic list is empty
+                host2Topics.remove(host, EMPTY_TOPIC_LIST);
+            }
+        }
+    }
+
     // Public getter to see if the client has been stopped.
     public boolean hasStopped() {
         return isStopped;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Fri Sep 14 16:16:17 2012
@@ -18,9 +18,12 @@
 package org.apache.hedwig.client.netty;
 
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
 import org.apache.hedwig.protocol.PubSubProtocol;
@@ -54,11 +57,13 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.SubscriptionListener;
 
 /**
  * This is the Hedwig Netty specific implementation of the Subscriber interface.
@@ -82,6 +87,8 @@ public class HedwigSubscriber implements
     // user set when connection is recovered
     protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler= new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
 
+    protected final CopyOnWriteArraySet<SubscriptionListener> listeners;
+
     protected final HedwigClientImpl client;
     protected final ClientConfiguration cfg;
     private final Object closeLock = new Object();
@@ -90,6 +97,22 @@ public class HedwigSubscriber implements
     public HedwigSubscriber(HedwigClientImpl client) {
         this.client = client;
         this.cfg = client.getConfiguration();
+        this.listeners = new CopyOnWriteArraySet<SubscriptionListener>();
+    }
+
+    public void addSubscriptionListener(SubscriptionListener listener) {
+        listeners.add(listener); 
+    }
+
+    public void removeSubscriptionListener(SubscriptionListener listener) {
+        listeners.remove(listener);
+    }
+
+    void emitSubscriptionEvent(ByteString topic, ByteString subscriberId,
+                               SubscriptionEvent event) {
+        for (SubscriptionListener listener : listeners) {
+            listener.processEvent(topic, subscriberId, event);
+        }
     }
 
     // Private method that holds the common logic for doing synchronous
@@ -424,6 +447,7 @@ public class HedwigSubscriber implements
             SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
             subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
             subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
+            subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach());
             // For now, all subscribes should wait for all cross-regional
             // subscriptions to be established before returning.
             subscribeRequestBuilder.setSynchronous(true);

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Fri Sep 14 16:16:17 2012
@@ -47,7 +47,9 @@ import org.apache.hedwig.exceptions.PubS
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
 import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.util.SubscriptionListener;
 
 @ChannelPipelineCoverage("all")
 public class ResponseHandler extends SimpleChannelHandler {
@@ -301,7 +303,9 @@ public class ResponseHandler extends Sim
             // Subscribe channel disconnected so first close and clear all
             // cached Channel data set up for this topic subscription.
             sub.closeSubscription(origSubData.topic, origSubData.subscriberId);
-            client.clearAllTopicsForHost(host);
+            // a subscription channel disconnecteda because topic has moved.
+            // just clear the entry for the given topic
+            client.clearHostForTopic(origSubData.topic, host);
             // Since the connection to the server host that was responsible
             // for the topic died, we are not sure about the state of that
             // server. Resend the original subscribe request data to the default
@@ -309,14 +313,24 @@ public class ResponseHandler extends Sim
             // contacted or attempted to from this request as we are starting a
             // "fresh" subscribe request.
             origSubData.clearServersList();
-            // Set a new type of VoidCallback for this async call. We need this
-            // hook so after the subscribe reconnect has completed, delivery for
-            // that topic subscriber should also be restarted (if it was that
-            // case before the channel disconnect).
-            origSubData.setCallback(new SubscribeReconnectCallback(origSubData, client));
-            origSubData.context = null;
-            logger.debug("Disconnected subscribe channel so reconnect with origSubData: {}", origSubData);
-            client.doConnect(origSubData, cfg.getDefaultServerHost());
+            // do resubscribe if the subscription enables it
+            if (origSubData.options.getEnableResubscribe()) {
+                // Set a new type of VoidCallback for this async call. We need this
+                // hook so after the subscribe reconnect has completed, delivery for
+                // that topic subscriber should also be restarted (if it was that
+                // case before the channel disconnect).
+                origSubData.setCallback(new SubscribeReconnectCallback(origSubData, client));
+                origSubData.context = null;
+                // Clear the shouldClaim flag
+                origSubData.shouldClaim = false;
+                logger.debug("Disconnected subscribe channel so reconnect with origSubData: {}", origSubData);
+                client.doConnect(origSubData, cfg.getDefaultServerHost());
+            } else {
+                logger.info("Subscription channel for (topic:{}, subscriber:{}) is disconnected.",
+                            origSubData.topic.toStringUtf8(), origSubData.subscriberId.toStringUtf8());
+                sub.emitSubscriptionEvent(origSubData.topic, origSubData.subscriberId,
+                                          SubscriptionEvent.TOPIC_MOVED);
+            }
         }
 
         // Finally, all of the PubSubRequests that are still waiting for an ack

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java?rev=1384836&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java Fri Sep 14 16:16:17 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.util;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+
+/**
+ * This class is used for subscriber to listen on subscription event.
+ */
+public interface SubscriptionListener {
+
+    /**
+     * Process an event from a subscription.
+     * <p>
+     * NOTE: It would be better to not run blocking operations in a
+     *       listener implementation.
+     * </p>
+     *
+     * @param topic
+     *          Topic Name
+     * @param subscriberId
+     *          Subscriber Id
+     * @param event
+     *          Event tell what happened to the subscription.
+     */
+    public void processEvent(ByteString topic, ByteString subscriberId,
+                             SubscriptionEvent event);
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Fri Sep 14 16:16:17 2012
@@ -155,6 +155,75 @@ public final class PubSubProtocol {
     // @@protoc_insertion_point(enum_scope:Hedwig.OperationType)
   }
   
+  public enum SubscriptionEvent
+      implements com.google.protobuf.ProtocolMessageEnum {
+    TOPIC_MOVED(0, 1),
+    SUBSCRIPTION_FORCED_CLOSED(1, 2),
+    ;
+    
+    public static final int TOPIC_MOVED_VALUE = 1;
+    public static final int SUBSCRIPTION_FORCED_CLOSED_VALUE = 2;
+    
+    
+    public final int getNumber() { return value; }
+    
+    public static SubscriptionEvent valueOf(int value) {
+      switch (value) {
+        case 1: return TOPIC_MOVED;
+        case 2: return SUBSCRIPTION_FORCED_CLOSED;
+        default: return null;
+      }
+    }
+    
+    public static com.google.protobuf.Internal.EnumLiteMap<SubscriptionEvent>
+        internalGetValueMap() {
+      return internalValueMap;
+    }
+    private static com.google.protobuf.Internal.EnumLiteMap<SubscriptionEvent>
+        internalValueMap =
+          new com.google.protobuf.Internal.EnumLiteMap<SubscriptionEvent>() {
+            public SubscriptionEvent findValueByNumber(int number) {
+              return SubscriptionEvent.valueOf(number);
+            }
+          };
+    
+    public final com.google.protobuf.Descriptors.EnumValueDescriptor
+        getValueDescriptor() {
+      return getDescriptor().getValues().get(index);
+    }
+    public final com.google.protobuf.Descriptors.EnumDescriptor
+        getDescriptorForType() {
+      return getDescriptor();
+    }
+    public static final com.google.protobuf.Descriptors.EnumDescriptor
+        getDescriptor() {
+      return org.apache.hedwig.protocol.PubSubProtocol.getDescriptor().getEnumTypes().get(2);
+    }
+    
+    private static final SubscriptionEvent[] VALUES = {
+      TOPIC_MOVED, SUBSCRIPTION_FORCED_CLOSED, 
+    };
+    
+    public static SubscriptionEvent valueOf(
+        com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+      if (desc.getType() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "EnumValueDescriptor is not for this type.");
+      }
+      return VALUES[desc.getIndex()];
+    }
+    
+    private final int index;
+    private final int value;
+    
+    private SubscriptionEvent(int index, int value) {
+      this.index = index;
+      this.value = value;
+    }
+    
+    // @@protoc_insertion_point(enum_scope:Hedwig.SubscriptionEvent)
+  }
+  
   public enum StatusCode
       implements com.google.protobuf.ProtocolMessageEnum {
     SUCCESS(0, 0),
@@ -251,7 +320,7 @@ public final class PubSubProtocol {
     }
     public static final com.google.protobuf.Descriptors.EnumDescriptor
         getDescriptor() {
-      return org.apache.hedwig.protocol.PubSubProtocol.getDescriptor().getEnumTypes().get(2);
+      return org.apache.hedwig.protocol.PubSubProtocol.getDescriptor().getEnumTypes().get(3);
     }
     
     private static final StatusCode[] VALUES = {
@@ -6218,6 +6287,10 @@ public final class PubSubProtocol {
     boolean hasPreferences();
     org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences getPreferences();
     org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder getPreferencesOrBuilder();
+    
+    // optional bool forceAttach = 7 [default = false];
+    boolean hasForceAttach();
+    boolean getForceAttach();
   }
   public static final class SubscribeRequest extends
       com.google.protobuf.GeneratedMessage
@@ -6373,12 +6446,23 @@ public final class PubSubProtocol {
       return preferences_;
     }
     
+    // optional bool forceAttach = 7 [default = false];
+    public static final int FORCEATTACH_FIELD_NUMBER = 7;
+    private boolean forceAttach_;
+    public boolean hasForceAttach() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public boolean getForceAttach() {
+      return forceAttach_;
+    }
+    
     private void initFields() {
       subscriberId_ = com.google.protobuf.ByteString.EMPTY;
       createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
       synchronous_ = false;
       messageBound_ = 0;
       preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+      forceAttach_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6411,6 +6495,9 @@ public final class PubSubProtocol {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeMessage(6, preferences_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBool(7, forceAttach_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -6440,6 +6527,10 @@ public final class PubSubProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(6, preferences_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(7, forceAttach_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -6579,6 +6670,8 @@ public final class PubSubProtocol {
           preferencesBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000010);
+        forceAttach_ = false;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -6641,6 +6734,10 @@ public final class PubSubProtocol {
         } else {
           result.preferences_ = preferencesBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.forceAttach_ = forceAttach_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6672,6 +6769,9 @@ public final class PubSubProtocol {
         if (other.hasPreferences()) {
           mergePreferences(other.getPreferences());
         }
+        if (other.hasForceAttach()) {
+          setForceAttach(other.getForceAttach());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6742,6 +6842,11 @@ public final class PubSubProtocol {
               setPreferences(subBuilder.buildPartial());
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000020;
+              forceAttach_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -6928,6 +7033,27 @@ public final class PubSubProtocol {
         return preferencesBuilder_;
       }
       
+      // optional bool forceAttach = 7 [default = false];
+      private boolean forceAttach_ ;
+      public boolean hasForceAttach() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public boolean getForceAttach() {
+        return forceAttach_;
+      }
+      public Builder setForceAttach(boolean value) {
+        bitField0_ |= 0x00000020;
+        forceAttach_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearForceAttach() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        forceAttach_ = false;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Hedwig.SubscribeRequest)
     }
     
@@ -6942,6 +7068,10 @@ public final class PubSubProtocol {
   public interface SubscriptionOptionsOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
     
+    // optional bool forceAttach = 1 [default = false];
+    boolean hasForceAttach();
+    boolean getForceAttach();
+    
     // optional .Hedwig.SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
     boolean hasCreateOrAttach();
     org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach getCreateOrAttach();
@@ -6958,6 +7088,10 @@ public final class PubSubProtocol {
     // optional string messageFilter = 5;
     boolean hasMessageFilter();
     String getMessageFilter();
+    
+    // optional bool enableResubscribe = 7 [default = true];
+    boolean hasEnableResubscribe();
+    boolean getEnableResubscribe();
   }
   public static final class SubscriptionOptions extends
       com.google.protobuf.GeneratedMessage
@@ -6988,11 +7122,21 @@ public final class PubSubProtocol {
     }
     
     private int bitField0_;
+    // optional bool forceAttach = 1 [default = false];
+    public static final int FORCEATTACH_FIELD_NUMBER = 1;
+    private boolean forceAttach_;
+    public boolean hasForceAttach() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public boolean getForceAttach() {
+      return forceAttach_;
+    }
+    
     // optional .Hedwig.SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
     public static final int CREATEORATTACH_FIELD_NUMBER = 2;
     private org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach_;
     public boolean hasCreateOrAttach() {
-      return ((bitField0_ & 0x00000001) == 0x00000001);
+      return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     public org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach getCreateOrAttach() {
       return createOrAttach_;
@@ -7002,7 +7146,7 @@ public final class PubSubProtocol {
     public static final int MESSAGEBOUND_FIELD_NUMBER = 3;
     private int messageBound_;
     public boolean hasMessageBound() {
-      return ((bitField0_ & 0x00000002) == 0x00000002);
+      return ((bitField0_ & 0x00000004) == 0x00000004);
     }
     public int getMessageBound() {
       return messageBound_;
@@ -7012,7 +7156,7 @@ public final class PubSubProtocol {
     public static final int OPTIONS_FIELD_NUMBER = 4;
     private org.apache.hedwig.protocol.PubSubProtocol.Map options_;
     public boolean hasOptions() {
-      return ((bitField0_ & 0x00000004) == 0x00000004);
+      return ((bitField0_ & 0x00000008) == 0x00000008);
     }
     public org.apache.hedwig.protocol.PubSubProtocol.Map getOptions() {
       return options_;
@@ -7025,7 +7169,7 @@ public final class PubSubProtocol {
     public static final int MESSAGEFILTER_FIELD_NUMBER = 5;
     private java.lang.Object messageFilter_;
     public boolean hasMessageFilter() {
-      return ((bitField0_ & 0x00000008) == 0x00000008);
+      return ((bitField0_ & 0x00000010) == 0x00000010);
     }
     public String getMessageFilter() {
       java.lang.Object ref = messageFilter_;
@@ -7053,11 +7197,23 @@ public final class PubSubProtocol {
       }
     }
     
+    // optional bool enableResubscribe = 7 [default = true];
+    public static final int ENABLERESUBSCRIBE_FIELD_NUMBER = 7;
+    private boolean enableResubscribe_;
+    public boolean hasEnableResubscribe() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public boolean getEnableResubscribe() {
+      return enableResubscribe_;
+    }
+    
     private void initFields() {
+      forceAttach_ = false;
       createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
       messageBound_ = 0;
       options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
       messageFilter_ = "";
+      enableResubscribe_ = true;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -7072,17 +7228,23 @@ public final class PubSubProtocol {
                         throws java.io.IOException {
       getSerializedSize();
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        output.writeEnum(2, createOrAttach_.getNumber());
+        output.writeBool(1, forceAttach_);
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        output.writeUInt32(3, messageBound_);
+        output.writeEnum(2, createOrAttach_.getNumber());
       }
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
-        output.writeMessage(4, options_);
+        output.writeUInt32(3, messageBound_);
       }
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeMessage(4, options_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeBytes(5, getMessageFilterBytes());
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBool(7, enableResubscribe_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -7094,20 +7256,28 @@ public final class PubSubProtocol {
       size = 0;
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeEnumSize(2, createOrAttach_.getNumber());
+          .computeBoolSize(1, forceAttach_);
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeUInt32Size(3, messageBound_);
+          .computeEnumSize(2, createOrAttach_.getNumber());
       }
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeMessageSize(4, options_);
+          .computeUInt32Size(3, messageBound_);
       }
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(4, options_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(5, getMessageFilterBytes());
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(7, enableResubscribe_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -7233,18 +7403,22 @@ public final class PubSubProtocol {
       
       public Builder clear() {
         super.clear();
-        createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
+        forceAttach_ = false;
         bitField0_ = (bitField0_ & ~0x00000001);
-        messageBound_ = 0;
+        createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
         bitField0_ = (bitField0_ & ~0x00000002);
+        messageBound_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
         if (optionsBuilder_ == null) {
           options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
         } else {
           optionsBuilder_.clear();
         }
-        bitField0_ = (bitField0_ & ~0x00000004);
-        messageFilter_ = "";
         bitField0_ = (bitField0_ & ~0x00000008);
+        messageFilter_ = "";
+        bitField0_ = (bitField0_ & ~0x00000010);
+        enableResubscribe_ = true;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -7286,23 +7460,31 @@ public final class PubSubProtocol {
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
           to_bitField0_ |= 0x00000001;
         }
-        result.createOrAttach_ = createOrAttach_;
+        result.forceAttach_ = forceAttach_;
         if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
           to_bitField0_ |= 0x00000002;
         }
-        result.messageBound_ = messageBound_;
+        result.createOrAttach_ = createOrAttach_;
         if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
           to_bitField0_ |= 0x00000004;
         }
+        result.messageBound_ = messageBound_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
         if (optionsBuilder_ == null) {
           result.options_ = options_;
         } else {
           result.options_ = optionsBuilder_.build();
         }
-        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
-          to_bitField0_ |= 0x00000008;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
         }
         result.messageFilter_ = messageFilter_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.enableResubscribe_ = enableResubscribe_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -7319,6 +7501,9 @@ public final class PubSubProtocol {
       
       public Builder mergeFrom(org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions other) {
         if (other == org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.getDefaultInstance()) return this;
+        if (other.hasForceAttach()) {
+          setForceAttach(other.getForceAttach());
+        }
         if (other.hasCreateOrAttach()) {
           setCreateOrAttach(other.getCreateOrAttach());
         }
@@ -7331,6 +7516,9 @@ public final class PubSubProtocol {
         if (other.hasMessageFilter()) {
           setMessageFilter(other.getMessageFilter());
         }
+        if (other.hasEnableResubscribe()) {
+          setEnableResubscribe(other.getEnableResubscribe());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -7362,19 +7550,24 @@ public final class PubSubProtocol {
               }
               break;
             }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              forceAttach_ = input.readBool();
+              break;
+            }
             case 16: {
               int rawValue = input.readEnum();
               org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach value = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.valueOf(rawValue);
               if (value == null) {
                 unknownFields.mergeVarintField(2, rawValue);
               } else {
-                bitField0_ |= 0x00000001;
+                bitField0_ |= 0x00000002;
                 createOrAttach_ = value;
               }
               break;
             }
             case 24: {
-              bitField0_ |= 0x00000002;
+              bitField0_ |= 0x00000004;
               messageBound_ = input.readUInt32();
               break;
             }
@@ -7388,20 +7581,46 @@ public final class PubSubProtocol {
               break;
             }
             case 42: {
-              bitField0_ |= 0x00000008;
+              bitField0_ |= 0x00000010;
               messageFilter_ = input.readBytes();
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000020;
+              enableResubscribe_ = input.readBool();
+              break;
+            }
           }
         }
       }
       
       private int bitField0_;
       
+      // optional bool forceAttach = 1 [default = false];
+      private boolean forceAttach_ ;
+      public boolean hasForceAttach() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public boolean getForceAttach() {
+        return forceAttach_;
+      }
+      public Builder setForceAttach(boolean value) {
+        bitField0_ |= 0x00000001;
+        forceAttach_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearForceAttach() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        forceAttach_ = false;
+        onChanged();
+        return this;
+      }
+      
       // optional .Hedwig.SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
       private org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
       public boolean hasCreateOrAttach() {
-        return ((bitField0_ & 0x00000001) == 0x00000001);
+        return ((bitField0_ & 0x00000002) == 0x00000002);
       }
       public org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach getCreateOrAttach() {
         return createOrAttach_;
@@ -7410,13 +7629,13 @@ public final class PubSubProtocol {
         if (value == null) {
           throw new NullPointerException();
         }
-        bitField0_ |= 0x00000001;
+        bitField0_ |= 0x00000002;
         createOrAttach_ = value;
         onChanged();
         return this;
       }
       public Builder clearCreateOrAttach() {
-        bitField0_ = (bitField0_ & ~0x00000001);
+        bitField0_ = (bitField0_ & ~0x00000002);
         createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
         onChanged();
         return this;
@@ -7425,19 +7644,19 @@ public final class PubSubProtocol {
       // optional uint32 messageBound = 3 [default = 0];
       private int messageBound_ ;
       public boolean hasMessageBound() {
-        return ((bitField0_ & 0x00000002) == 0x00000002);
+        return ((bitField0_ & 0x00000004) == 0x00000004);
       }
       public int getMessageBound() {
         return messageBound_;
       }
       public Builder setMessageBound(int value) {
-        bitField0_ |= 0x00000002;
+        bitField0_ |= 0x00000004;
         messageBound_ = value;
         onChanged();
         return this;
       }
       public Builder clearMessageBound() {
-        bitField0_ = (bitField0_ & ~0x00000002);
+        bitField0_ = (bitField0_ & ~0x00000004);
         messageBound_ = 0;
         onChanged();
         return this;
@@ -7448,7 +7667,7 @@ public final class PubSubProtocol {
       private com.google.protobuf.SingleFieldBuilder<
           org.apache.hedwig.protocol.PubSubProtocol.Map, org.apache.hedwig.protocol.PubSubProtocol.Map.Builder, org.apache.hedwig.protocol.PubSubProtocol.MapOrBuilder> optionsBuilder_;
       public boolean hasOptions() {
-        return ((bitField0_ & 0x00000004) == 0x00000004);
+        return ((bitField0_ & 0x00000008) == 0x00000008);
       }
       public org.apache.hedwig.protocol.PubSubProtocol.Map getOptions() {
         if (optionsBuilder_ == null) {
@@ -7467,7 +7686,7 @@ public final class PubSubProtocol {
         } else {
           optionsBuilder_.setMessage(value);
         }
-        bitField0_ |= 0x00000004;
+        bitField0_ |= 0x00000008;
         return this;
       }
       public Builder setOptions(
@@ -7478,12 +7697,12 @@ public final class PubSubProtocol {
         } else {
           optionsBuilder_.setMessage(builderForValue.build());
         }
-        bitField0_ |= 0x00000004;
+        bitField0_ |= 0x00000008;
         return this;
       }
       public Builder mergeOptions(org.apache.hedwig.protocol.PubSubProtocol.Map value) {
         if (optionsBuilder_ == null) {
-          if (((bitField0_ & 0x00000004) == 0x00000004) &&
+          if (((bitField0_ & 0x00000008) == 0x00000008) &&
               options_ != org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance()) {
             options_ =
               org.apache.hedwig.protocol.PubSubProtocol.Map.newBuilder(options_).mergeFrom(value).buildPartial();
@@ -7494,7 +7713,7 @@ public final class PubSubProtocol {
         } else {
           optionsBuilder_.mergeFrom(value);
         }
-        bitField0_ |= 0x00000004;
+        bitField0_ |= 0x00000008;
         return this;
       }
       public Builder clearOptions() {
@@ -7504,11 +7723,11 @@ public final class PubSubProtocol {
         } else {
           optionsBuilder_.clear();
         }
-        bitField0_ = (bitField0_ & ~0x00000004);
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       public org.apache.hedwig.protocol.PubSubProtocol.Map.Builder getOptionsBuilder() {
-        bitField0_ |= 0x00000004;
+        bitField0_ |= 0x00000008;
         onChanged();
         return getOptionsFieldBuilder().getBuilder();
       }
@@ -7536,7 +7755,7 @@ public final class PubSubProtocol {
       // optional string messageFilter = 5;
       private java.lang.Object messageFilter_ = "";
       public boolean hasMessageFilter() {
-        return ((bitField0_ & 0x00000008) == 0x00000008);
+        return ((bitField0_ & 0x00000010) == 0x00000010);
       }
       public String getMessageFilter() {
         java.lang.Object ref = messageFilter_;
@@ -7552,23 +7771,44 @@ public final class PubSubProtocol {
         if (value == null) {
     throw new NullPointerException();
   }
-  bitField0_ |= 0x00000008;
+  bitField0_ |= 0x00000010;
         messageFilter_ = value;
         onChanged();
         return this;
       }
       public Builder clearMessageFilter() {
-        bitField0_ = (bitField0_ & ~0x00000008);
+        bitField0_ = (bitField0_ & ~0x00000010);
         messageFilter_ = getDefaultInstance().getMessageFilter();
         onChanged();
         return this;
       }
       void setMessageFilter(com.google.protobuf.ByteString value) {
-        bitField0_ |= 0x00000008;
+        bitField0_ |= 0x00000010;
         messageFilter_ = value;
         onChanged();
       }
       
+      // optional bool enableResubscribe = 7 [default = true];
+      private boolean enableResubscribe_ = true;
+      public boolean hasEnableResubscribe() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public boolean getEnableResubscribe() {
+        return enableResubscribe_;
+      }
+      public Builder setEnableResubscribe(boolean value) {
+        bitField0_ |= 0x00000020;
+        enableResubscribe_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearEnableResubscribe() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        enableResubscribe_ = true;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Hedwig.SubscriptionOptions)
     }
     
@@ -15144,67 +15384,71 @@ public final class PubSubProtocol {
       "yRequest\".\n\016PublishRequest\022\034\n\003msg\030\002 \002(\0132" +
       "\017.Hedwig.Message\"d\n\027SubscriptionPreferen" +
       "ces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014mes" +
-      "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\"\243" +
+      "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\"\277" +
       "\002\n\020SubscribeRequest\022\024\n\014subscriberId\030\002 \002(" +
       "\014\022Q\n\016createOrAttach\030\003 \001(\0162\'.Hedwig.Subsc",
       "ribeRequest.CreateOrAttach:\020CREATE_OR_AT" +
       "TACH\022\032\n\013synchronous\030\004 \001(\010:\005false\022\024\n\014mess" +
       "ageBound\030\005 \001(\r\0224\n\013preferences\030\006 \001(\0132\037.He" +
-      "dwig.SubscriptionPreferences\">\n\016CreateOr" +
-      "Attach\022\n\n\006CREATE\020\000\022\n\n\006ATTACH\020\001\022\024\n\020CREATE" +
-      "_OR_ATTACH\020\002\"\266\001\n\023SubscriptionOptions\022Q\n\016" +
-      "createOrAttach\030\002 \001(\0162\'.Hedwig.SubscribeR" +
-      "equest.CreateOrAttach:\020CREATE_OR_ATTACH\022" +
-      "\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007options\030\004 \001(" +
-      "\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030\005 \001(\t\"K\n",
-      "\016ConsumeRequest\022\024\n\014subscriberId\030\002 \002(\014\022#\n" +
-      "\005msgId\030\003 \002(\0132\024.Hedwig.MessageSeqId\"*\n\022Un" +
-      "subscribeRequest\022\024\n\014subscriberId\030\002 \002(\014\"+" +
-      "\n\023StopDeliveryRequest\022\024\n\014subscriberId\030\002 " +
-      "\002(\014\",\n\024StartDeliveryRequest\022\024\n\014subscribe" +
-      "rId\030\002 \002(\014\"\377\001\n\016PubSubResponse\0220\n\017protocol" +
-      "Version\030\001 \002(\0162\027.Hedwig.ProtocolVersion\022&" +
-      "\n\nstatusCode\030\002 \002(\0162\022.Hedwig.StatusCode\022\r" +
-      "\n\005txnId\030\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(\t\022 \n\007mess" +
-      "age\030\005 \001(\0132\017.Hedwig.Message\022\r\n\005topic\030\006 \001(",
-      "\014\022\024\n\014subscriberId\030\007 \001(\014\022*\n\014responseBody\030" +
-      "\010 \001(\0132\024.Hedwig.ResponseBody\"?\n\017PublishRe" +
-      "sponse\022,\n\016publishedMsgId\030\001 \002(\0132\024.Hedwig." +
-      "MessageSeqId\"I\n\021SubscribeResponse\0224\n\013pre" +
-      "ferences\030\002 \001(\0132\037.Hedwig.SubscriptionPref" +
-      "erences\"v\n\014ResponseBody\0220\n\017publishRespon" +
-      "se\030\001 \001(\0132\027.Hedwig.PublishResponse\0224\n\021sub" +
-      "scribeResponse\030\002 \001(\0132\031.Hedwig.SubscribeR" +
-      "esponse\"N\n\021SubscriptionState\022#\n\005msgId\030\001 " +
-      "\002(\0132\024.Hedwig.MessageSeqId\022\024\n\014messageBoun",
-      "d\030\002 \001(\r\"r\n\020SubscriptionData\022(\n\005state\030\001 \001" +
-      "(\0132\031.Hedwig.SubscriptionState\0224\n\013prefere" +
-      "nces\030\002 \001(\0132\037.Hedwig.SubscriptionPreferen" +
-      "ces\"O\n\013LedgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020" +
-      "endSeqIdIncluded\030\002 \001(\0132\024.Hedwig.MessageS" +
-      "eqId\"3\n\014LedgerRanges\022#\n\006ranges\030\001 \003(\0132\023.H" +
-      "edwig.LedgerRange\":\n\013ManagerMeta\022\023\n\013mana" +
-      "gerImpl\030\002 \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n" +
-      "\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030" +
-      "\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*",
-      "\"\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020\001*p\n\rO" +
-      "perationType\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001" +
-      "\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_" +
-      "DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*\205\004\n\nStatus" +
-      "Code\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221" +
-      "\003\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_" +
-      "SUBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224" +
-      "\003\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020" +
-      "\226\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SE" +
-      "RVICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026I",
-      "NVALID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020" +
-      "\210\004\022\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TO" +
-      "PIC_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUB" +
-      "SCRIPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE" +
-      "_EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027" +
-      "TOPIC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED" +
-      "_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apa" +
-      "che.hedwig.protocolH\001"
+      "dwig.SubscriptionPreferences\022\032\n\013forceAtt" +
+      "ach\030\007 \001(\010:\005false\">\n\016CreateOrAttach\022\n\n\006CR" +
+      "EATE\020\000\022\n\n\006ATTACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002" +
+      "\"\363\001\n\023SubscriptionOptions\022\032\n\013forceAttach\030" +
+      "\001 \001(\010:\005false\022Q\n\016createOrAttach\030\002 \001(\0162\'.H" +
+      "edwig.SubscribeRequest.CreateOrAttach:\020C" +
+      "REATE_OR_ATTACH\022\027\n\014messageBound\030\003 \001(\r:\0010",
+      "\022\034\n\007options\030\004 \001(\0132\013.Hedwig.Map\022\025\n\rmessag" +
+      "eFilter\030\005 \001(\t\022\037\n\021enableResubscribe\030\007 \001(\010" +
+      ":\004true\"K\n\016ConsumeRequest\022\024\n\014subscriberId" +
+      "\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.Hedwig.MessageSe" +
+      "qId\"*\n\022UnsubscribeRequest\022\024\n\014subscriberI" +
+      "d\030\002 \002(\014\"+\n\023StopDeliveryRequest\022\024\n\014subscr" +
+      "iberId\030\002 \002(\014\",\n\024StartDeliveryRequest\022\024\n\014" +
+      "subscriberId\030\002 \002(\014\"\377\001\n\016PubSubResponse\0220\n" +
+      "\017protocolVersion\030\001 \002(\0162\027.Hedwig.Protocol" +
+      "Version\022&\n\nstatusCode\030\002 \002(\0162\022.Hedwig.Sta",
+      "tusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(" +
+      "\t\022 \n\007message\030\005 \001(\0132\017.Hedwig.Message\022\r\n\005t" +
+      "opic\030\006 \001(\014\022\024\n\014subscriberId\030\007 \001(\014\022*\n\014resp" +
+      "onseBody\030\010 \001(\0132\024.Hedwig.ResponseBody\"?\n\017" +
+      "PublishResponse\022,\n\016publishedMsgId\030\001 \002(\0132" +
+      "\024.Hedwig.MessageSeqId\"I\n\021SubscribeRespon" +
+      "se\0224\n\013preferences\030\002 \001(\0132\037.Hedwig.Subscri" +
+      "ptionPreferences\"v\n\014ResponseBody\0220\n\017publ" +
+      "ishResponse\030\001 \001(\0132\027.Hedwig.PublishRespon" +
+      "se\0224\n\021subscribeResponse\030\002 \001(\0132\031.Hedwig.S",
+      "ubscribeResponse\"N\n\021SubscriptionState\022#\n" +
+      "\005msgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\022\024\n\014me" +
+      "ssageBound\030\002 \001(\r\"r\n\020SubscriptionData\022(\n\005" +
+      "state\030\001 \001(\0132\031.Hedwig.SubscriptionState\0224" +
+      "\n\013preferences\030\002 \001(\0132\037.Hedwig.Subscriptio" +
+      "nPreferences\"O\n\013LedgerRange\022\020\n\010ledgerId\030" +
+      "\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 \001(\0132\024.Hedwig" +
+      ".MessageSeqId\"3\n\014LedgerRanges\022#\n\006ranges\030" +
+      "\001 \003(\0132\023.Hedwig.LedgerRange\":\n\013ManagerMet" +
+      "a\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016managerVersion",
+      "\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022" +
+      "\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopi" +
+      "cs\030\002 \002(\004*\"\n\017ProtocolVersion\022\017\n\013VERSION_O" +
+      "NE\020\001*p\n\rOperationType\022\013\n\007PUBLISH\020\000\022\r\n\tSU" +
+      "BSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022" +
+      "\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*D" +
+      "\n\021SubscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032" +
+      "SUBSCRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusC" +
+      "ode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003" +
+      "\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_S",
+      "UBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003" +
+      "\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226" +
+      "\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SER" +
+      "VICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026IN" +
+      "VALID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210" +
+      "\004\022\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOP" +
+      "IC_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBS" +
+      "CRIPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_" +
+      "EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027T" +
+      "OPIC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_",
+      "CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apac" +
+      "he.hedwig.protocolH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15288,7 +15532,7 @@ public final class PubSubProtocol {
           internal_static_Hedwig_SubscribeRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscribeRequest_descriptor,
-              new java.lang.String[] { "SubscriberId", "CreateOrAttach", "Synchronous", "MessageBound", "Preferences", },
+              new java.lang.String[] { "SubscriberId", "CreateOrAttach", "Synchronous", "MessageBound", "Preferences", "ForceAttach", },
               org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.Builder.class);
           internal_static_Hedwig_SubscriptionOptions_descriptor =
@@ -15296,7 +15540,7 @@ public final class PubSubProtocol {
           internal_static_Hedwig_SubscriptionOptions_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionOptions_descriptor,
-              new java.lang.String[] { "CreateOrAttach", "MessageBound", "Options", "MessageFilter", },
+              new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "EnableResubscribe", },
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.Builder.class);
           internal_static_Hedwig_ConsumeRequest_descriptor =

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Fri Sep 14 16:16:17 2012
@@ -136,17 +136,26 @@ message SubscribeRequest{
 
     // subscription options
     optional SubscriptionPreferences preferences = 6;
+
+    // force attach subscription which would kill existed channel
+    // this option doesn't need to be persisted
+    optional bool forceAttach = 7 [default = false];
 }
 
 // used in client only
 // options are stored in SubscriptionPreferences structure
 message SubscriptionOptions {
+    // force attach subscription which would kill existed channel
+    // this option doesn't need to be persisted
+    optional bool forceAttach = 1 [default = false];
     optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
     optional uint32 messageBound = 3 [default = 0];
     // user customized subscription options
     optional Map options = 4;
     // server-side message filter
     optional string messageFilter = 5;
+    // enable resubscribe
+    optional bool enableResubscribe = 7 [default = true];
 }
 
 message ConsumeRequest{
@@ -168,6 +177,14 @@ message StartDeliveryRequest{
     required bytes subscriberId = 2;
 }
 
+// Identify an event happened for a subscription
+enum SubscriptionEvent {
+    // topic has changed ownership (hub server down or topic released)
+    TOPIC_MOVED = 1;
+    // subscription is force closed by other subscribers
+    SUBSCRIPTION_FORCED_CLOSED = 2;
+}
+
 message PubSubResponse{
     required ProtocolVersion protocolVersion = 1;
     required StatusCode statusCode = 2;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Fri Sep 14 16:16:17 2012
@@ -50,11 +50,14 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.topics.HubInfo;
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.util.SubscriptionListener;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 
@@ -238,6 +241,15 @@ public class HedwigConsole {
         
     }
 
+    static class ConsoleSubscriptionListener implements SubscriptionListener {
+
+        @Override
+        public void processEvent(ByteString t, ByteString s, SubscriptionEvent event) {
+            System.out.println("Subscription Channel for (topic:" + t.toStringUtf8() + ", subscriber:"
+                                + s.toStringUtf8() + ") received event : " + event);
+        }
+    }
+
     class SubCmd implements MyCommand {
 
         @Override
@@ -271,7 +283,10 @@ public class HedwigConsole {
             ByteString topic = ByteString.copyFromUtf8(args[1]);
             ByteString subId = ByteString.copyFromUtf8(args[2]);
             try {
-                subscriber.subscribe(topic, subId, mode);
+                SubscriptionOptions options =
+                    SubscriptionOptions.newBuilder().setCreateOrAttach(mode)
+                                       .setForceAttach(false).build();
+                subscriber.subscribe(topic, subId, options);
                 if (receive) {
                     subscriber.startDelivery(topic, subId, consoleHandler);
                     System.out.println("SUB DONE AND RECEIVE");
@@ -400,6 +415,7 @@ public class HedwigConsole {
                                  + " subscriber: " + args[2]);
                 return true;
             }
+            lastConsumedId = subData.getState().getMsgId().getLocalComponent();
             long numMessagesToConsume = Long.parseLong(args[3]);
             long idToConsumed = lastConsumedId + numMessagesToConsume;
             System.out.println("Try to move subscriber(" + args[2] + ") consume ptr of topic(" + args[1]
@@ -852,6 +868,7 @@ public class HedwigConsole {
         hubClient = new HedwigClient(hubClientCfg);
         publisher = hubClient.getPublisher();
         subscriber = hubClient.getSubscriber();
+        subscriber.addSubscriptionListener(new ConsoleSubscriptionListener());
         
         // other parameters
         myRegion = hubServerConf.getMyRegion();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Fri Sep 14 16:16:17 2012
@@ -314,6 +314,10 @@ public class FIFODeliveryManager impleme
         }
 
         public void setNotConnected() {
+            // have closed it.
+            if (!isConnected()) {
+                return;
+            }
             this.connected = false;
             deliveryEndPoint.close();
             // uninitialize filter

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Fri Sep 14 16:16:17 2012
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHa
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 
 import com.google.protobuf.ByteString;
@@ -65,6 +66,17 @@ public class SubscribeHandler extends Ba
     // op stats
     private final OpStats subStats;
 
+    private static ChannelFutureListener CLOSE_OLD_CHANNEL_LISTENER = new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+            if (!future.isSuccess()) {
+                logger.warn("Failed to close old subscription channel.");
+            } else {
+                logger.debug("Close old subscription channel succeed.");
+            }
+        }
+    };
+
     public SubscribeHandler(TopicManager topicMgr, DeliveryManager deliveryManager, PersistenceManager persistenceMgr,
                             SubscriptionManager subMgr, ServerConfiguration cfg) {
         super(topicMgr, cfg);
@@ -83,7 +95,8 @@ public class SubscribeHandler extends Ba
         synchronized (channel) {
             TopicSubscriber topicSub = channel2sub.remove(channel);
             if (topicSub != null) {
-                sub2Channel.remove(topicSub);
+                // remove entry only currently mapped to given value.
+                sub2Channel.remove(topicSub, channel);
             }
         }
     }
@@ -106,6 +119,8 @@ public class SubscribeHandler extends Ba
         } catch (ServerNotResponsibleForTopicException e) {
             channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId())).addListener(
                 ChannelFutureListener.CLOSE);
+            logger.error("Error getting current seq id for topic " + topic.toStringUtf8()
+                       + " when processing subscribe request (txnid:" + request.getTxnId() + ") :", e);
             subStats.incrementFailedOps();
             ServerStats.getInstance().incrementRequestsRedirect();
             return;
@@ -123,6 +138,8 @@ public class SubscribeHandler extends Ba
             public void operationFailed(Object ctx, PubSubException exception) {
                 channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())).addListener(
                     ChannelFutureListener.CLOSE);
+                logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: "
+                           + topic.toStringUtf8() + " , subscriber: " + subscriberId.toStringUtf8() + ")", exception);
                 subStats.incrementFailedOps();
             }
 
@@ -130,9 +147,6 @@ public class SubscribeHandler extends Ba
             public void operationFinished(Object ctx, SubscriptionData subData) {
 
                 TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
-
-                // race with channel getting disconnected while we are adding it
-                // to the 2 maps
                 synchronized (channel) {
                     if (!channel.isConnected()) {
                         // channel got disconnected while we were processing the
@@ -141,20 +155,6 @@ public class SubscribeHandler extends Ba
                         subStats.incrementFailedOps();
                         return;
                     }
-
-                    if (null != sub2Channel.putIfAbsent(topicSub, channel)) {
-                        // there was another channel mapped to this sub
-                        PubSubException pse = new PubSubException.TopicBusyException(
-                            "subscription for this topic, subscriberId is already being served on a different channel");
-                        channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
-                        .addListener(ChannelFutureListener.CLOSE);
-                        subStats.incrementFailedOps();
-                        return;
-                    } else {
-                        // channel2sub is just a cache, so we can add to it
-                        // without synchronization
-                        channel2sub.put(channel, topicSub);
-                    }
                 }
                 // initialize the message filter
                 PipelineFilter filter = new PipelineFilter();
@@ -192,7 +192,57 @@ public class SubscribeHandler extends Ba
                     .addListener(ChannelFutureListener.CLOSE);
                     return;
                 }
+                // race with channel getting disconnected while we are adding it
+                // to the 2 maps
+                synchronized (channel) {
+                    boolean forceAttach = false;
+                    if (subRequest.hasForceAttach()) {
+                        forceAttach = subRequest.getForceAttach();
+                    }
 
+                    Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
+                    if (null != oldChannel) {
+                        boolean subSuccess = false;
+                        if (forceAttach) {
+                            // it is safe to close old channel here since new channel will be put
+                            // in sub2Channel / channel2Sub so there is no race between channel
+                            // getting disconnected and it.
+                            ChannelFuture future = oldChannel.close();
+                            future.addListener(CLOSE_OLD_CHANNEL_LISTENER);
+                            logger.info("New subscribe request (" + request.getTxnId() + ") for (topic: " + topic.toStringUtf8()
+                                      + ", subscriber: " + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
+                                      + " kills old channel " + oldChannel.getRemoteAddress());
+                            // try replace the oldChannel
+                            // if replace failure, it migth caused because channelDisconnect callback
+                            // has removed the old channel.
+                            if (!sub2Channel.replace(topicSub, oldChannel, channel)) {
+                                // try to add it now.
+                                // if add failure, it means other one has obtained the channel
+                                oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
+                                if (null == oldChannel) {
+                                    subSuccess = true;
+                                }
+                            } else {
+                                subSuccess = true;
+                            }
+                        }
+                        if (!subSuccess) {
+                            PubSubException pse = new PubSubException.TopicBusyException(
+                                "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8()
+                                + " is already being served on a different channel " + oldChannel.getRemoteAddress());
+                            subStats.incrementFailedOps();
+                            logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: " + topic.toStringUtf8()
+                                       + ", subscriber: " + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
+                                       + " since it already being served on a different channel " + oldChannel.getRemoteAddress());
+                            channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
+                            .addListener(ChannelFutureListener.CLOSE);
+                            return;
+                        }
+                    }
+                    // channel2sub is just a cache, so we can add to it
+                    // without synchronization
+                    channel2sub.put(channel, topicSub);
+                }
                 // First write success and then tell the delivery manager,
                 // otherwise the first message might go out before the response
                 // to the subscribe