You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/14 18:16:18 UTC
svn commit: r1384836 [1/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/
hedwig-client/src/main/cpp/test/
hedwig-client/src/main/java/org/apache/hedwig/client/api/
hedwig-client/src/main/jav...
Author: ivank
Date: Fri Sep 14 16:16:17 2012
New Revision: 1384836
URL: http://svn.apache.org/viewvc?rev=1384836&view=rev
Log:
BOOKKEEPER-252: Hedwig: provide a subscription mode to kill other subscription channel when hedwig client is used as a proxy-style server. (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Sep 14 16:16:17 2012
@@ -158,6 +158,8 @@ Trunk (unreleased changes)
BOOKKEEPER-333: server-side message filter (sijie via ivank)
+ BOOKKEEPER-252: Hedwig: provide a subscription mode to kill other subscription channel when hedwig client is used as a proxy-style server. (sijie via ivank)
+
hedwig-client:
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h Fri Sep 14 16:16:17 2012
@@ -30,6 +30,18 @@
namespace Hedwig {
+ //
+ // A Listener registered for a Subscriber instance to emit events
+ // for those disable resubscribe subscriptions.
+ //
+ class SubscriptionListener {
+ public:
+ virtual void processEvent(const std::string &topic, const std::string &subscriberId,
+ const Hedwig::SubscriptionEvent event) = 0;
+ virtual ~SubscriptionListener() {};
+ };
+ typedef std::tr1::shared_ptr<SubscriptionListener> SubscriptionListenerPtr;
+
template<class R>
class Callback {
public:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h Fri Sep 14 16:16:17 2012
@@ -36,7 +36,7 @@ namespace Hedwig {
virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) = 0;
virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) = 0;
virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) = 0;
-
+
virtual void unsubscribe(const std::string& topic, const std::string& subscriberId) = 0;
virtual void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) = 0;
@@ -53,6 +53,14 @@ namespace Hedwig {
virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;
+ //
+ // API to register/unregister subscription listeners for receiving
+ // events indicating subscription changes for those disable resubscribe
+ // subscriptions
+ //
+ virtual void addSubscriptionListener(SubscriptionListenerPtr& listener) = 0;
+ virtual void removeSubscriptionListener(SubscriptionListenerPtr& listener) = 0;
+
virtual ~Subscriber() {}
};
};
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp Fri Sep 14 16:16:17 2012
@@ -134,6 +134,7 @@ const PubSubRequestPtr PubSubData::getRe
Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
subreq->set_subscriberid(subscriberid);
subreq->set_createorattach(options.createorattach());
+ subreq->set_forceattach(options.forceattach());
setPreferencesForSubRequest(subreq, options);
} else if (type == CONSUME) {
LOG4CXX_DEBUG(logger, "Creating consume request");
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Fri Sep 14 16:16:17 2012
@@ -147,7 +147,8 @@ void SubscriberReconnectCallback::operat
}
SubscriberClientChannelHandler::SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
- : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false), should_wait(true) {
+ : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false),
+ should_wait(true), disconnected(false) {
LOG4CXX_DEBUG(logger, "Creating SubscriberClientChannelHandler " << this);
}
@@ -178,7 +179,7 @@ void SubscriberClientChannelHandler::mes
void SubscriberClientChannelHandler::close() {
closed = true;
- if (channel) {
+ if (channel.get()) {
channel->kill();
}
}
@@ -190,7 +191,7 @@ void SubscriberClientChannelHandler::clo
return;
}
handler->should_wait = false;
- handler->channelDisconnected(channel, e);
+ handler->reconnect(channel, e);
}
void SubscriberClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) {
@@ -205,12 +206,40 @@ void SubscriberClientChannelHandler::cha
return;
}
+ {
+ boost::shared_lock<boost::shared_mutex> lock(disconnected_lock);
+ // some one is reconnecting return
+ if (disconnected) {
+ return;
+ }
+ disconnected = true;
+ }
+
+ // if we have registered the subscription channel listener, disable retry
+ // just trigger listener to let application handle channel disconnected event
+ if (!origData->getSubscriptionOptions().enableresubscribe()) {
+ LOG4CXX_INFO(logger, "Tell subscriber (topic:" << origData->getTopic()
+ << ", subscriberId:" << origData->getSubscriberId()
+ << ") his topic has been moved : channel "
+ << channel.get() << " is disconnected");
+ // remove record of the failed channel from the subscriber
+ client->getSubscriberImpl().closeSubscription(origData->getTopic(), origData->getSubscriberId());
+ // emit the event to notify the client that topic has been moved
+ client->getSubscriberImpl().emitSubscriptionEvent(
+ origData->getTopic(), origData->getSubscriberId(), TOPIC_MOVED);
+ } else {
+ reconnect(channel, e);
+ }
+}
+
+void SubscriberClientChannelHandler::reconnect(const DuplexChannelPtr& channel, const std::exception& e) {
if (should_wait) {
int retrywait = client->getConfiguration().getInt(Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME,
DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME);
- boost::asio::deadline_timer t(channel->getService(), boost::posix_time::milliseconds(retrywait));
- t.async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(),
+ // set reconnect timer
+ reconnectTimer = ReconnectTimerPtr(new boost::asio::deadline_timer(channel->getService(), boost::posix_time::milliseconds(retrywait)));
+ reconnectTimer->async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(),
channel, e, boost::asio::placeholders::error));
return;
}
@@ -387,9 +416,12 @@ void SubscriberImpl::doUnsubscribe(const
void SubscriberImpl::consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) {
TopicSubscriber t(topic, subscriberId);
-
- boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
- SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+
+ SubscriberClientChannelHandlerPtr handler;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+ handler = topicsubscriber2handler[t];
+ }
if (handler.get() == 0) {
LOG4CXX_ERROR(logger, "Cannot consume. Bad handler for topic(" << topic << ") subscriberId(" << subscriberId << ") topicsubscriber2topic(" << &topicsubscriber2handler << ")");
@@ -399,6 +431,7 @@ void SubscriberImpl::consume(const std::
DuplexChannelPtr channel = handler->getChannel();
if (channel.get() == 0) {
LOG4CXX_ERROR(logger, "Trying to consume a message on a topic/subscriber pair that don't have a channel. Something fishy going on. Topic: " << topic << " SubscriberId: " << subscriberId << " MessageSeqId: " << messageSeqId.localcomponent());
+ return;
}
PubSubDataPtr data = PubSubData::forConsumeRequest(client->counter().next(), subscriberId, topic, messageSeqId);
@@ -430,23 +463,31 @@ void SubscriberImpl::startDelivery(const
const MessageHandlerCallbackPtr& callback) {
TopicSubscriber t(topic, subscriberId);
- boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
- SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+ SubscriberClientChannelHandlerPtr handler;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+ handler = topicsubscriber2handler[t];
+ }
if (handler.get() == 0) {
LOG4CXX_ERROR(logger, "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId);
+ throw NotSubscribedException();
}
handler->startDelivery(callback);
}
void SubscriberImpl::stopDelivery(const std::string& topic, const std::string& subscriberId) {
TopicSubscriber t(topic, subscriberId);
-
- boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
- SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+
+ SubscriberClientChannelHandlerPtr handler;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+ handler = topicsubscriber2handler[t];
+ }
if (handler.get() == 0) {
- LOG4CXX_ERROR(logger, "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId);
+ LOG4CXX_ERROR(logger, "Trying to stop deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId);
+ throw NotSubscribedException();
}
handler->stopDelivery();
}
@@ -484,6 +525,29 @@ const SubscriptionPreferencesPtr& Subscr
return preferences;
}
+void SubscriberImpl::addSubscriptionListener(SubscriptionListenerPtr& listener) {
+ boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
+ listeners.insert(listener);
+}
+
+void SubscriberImpl::removeSubscriptionListener(SubscriptionListenerPtr& listener) {
+ boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
+ listeners.erase(listener);
+}
+
+void SubscriberImpl::emitSubscriptionEvent(const std::string& topic,
+ const std::string& subscriberId,
+ const SubscriptionEvent event) {
+ boost::shared_lock<boost::shared_mutex> lock(listeners_lock);
+ if (0 == listeners.size()) {
+ return;
+ }
+ for (SubscriptionListenerSet::iterator iter = listeners.begin();
+ iter != listeners.end(); ++iter) {
+ (*iter)->processEvent(topic, subscriberId, event);
+ }
+}
+
/**
takes ownership of txn
*/
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Fri Sep 14 16:16:17 2012
@@ -121,6 +121,8 @@ namespace Hedwig {
void setChannel(const DuplexChannelPtr& channel);
DuplexChannelPtr& getChannel();
+ void reconnect(const DuplexChannelPtr& channel, const std::exception& e);
+
static void reconnectTimerComplete(const SubscriberClientChannelHandlerPtr handler, const DuplexChannelPtr channel, const std::exception e,
const boost::system::error_code& error);
@@ -135,7 +137,18 @@ namespace Hedwig {
PubSubDataPtr origData;
DuplexChannelPtr channel;
bool closed;
+
+ boost::shared_mutex disconnected_lock;
bool should_wait;
+ bool disconnected;
+ typedef boost::shared_ptr<boost::asio::deadline_timer> ReconnectTimerPtr;
+ ReconnectTimerPtr reconnectTimer;
+ };
+
+ struct SubscriptionListenerPtrHash : public std::unary_function<SubscriptionListenerPtr, size_t> {
+ size_t operator()(const Hedwig::SubscriptionListenerPtr& listener) const {
+ return reinterpret_cast<size_t>(listener.get());
+ }
};
class SubscriberImpl : public Subscriber {
@@ -167,6 +180,12 @@ namespace Hedwig {
void doSubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data, const SubscriberClientChannelHandlerPtr& handler);
void doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
+ virtual void addSubscriptionListener(SubscriptionListenerPtr& listener);
+ virtual void removeSubscriptionListener(SubscriptionListenerPtr& listener);
+ void emitSubscriptionEvent(const std::string& topic,
+ const std::string& subscriberId,
+ const SubscriptionEvent event);
+
private:
void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
const SubscriptionPreferences& preferences);
@@ -180,6 +199,10 @@ namespace Hedwig {
boost::shared_mutex topicsubscriber2handler_lock;
std::tr1::unordered_map<TopicSubscriber, SubscriptionPreferencesPtr, TopicSubscriberHash> topicsubscriber2preferences;
boost::shared_mutex topicsubscriber2preferences_lock;
+
+ typedef std::tr1::unordered_set<SubscriptionListenerPtr, SubscriptionListenerPtrHash> SubscriptionListenerSet;
+ SubscriptionListenerSet listeners;
+ boost::shared_mutex listeners_lock;
};
};
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp Fri Sep 14 16:16:17 2012
@@ -164,3 +164,54 @@ TEST(SubscribeTest, testSubscribeTwice)
ASSERT_THROW(sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH), Hedwig::AlreadySubscribedException);
}
+TEST(SubscribeTest, testAsyncSubcribeForceAttach) {
+ Hedwig::Configuration* conf = new TestServerConfiguration();
+ std::auto_ptr<Hedwig::Configuration> confptr(conf);
+ // client 1
+ Hedwig::Client* client1 = new Hedwig::Client(*conf);
+ std::auto_ptr<Hedwig::Client> client1ptr(client1);
+ Hedwig::Subscriber& sub1 = client1->getSubscriber();
+ // client 2
+ Hedwig::Client* client2 = new Hedwig::Client(*conf);
+ std::auto_ptr<Hedwig::Client> client2ptr(client2);
+ Hedwig::Subscriber& sub2 = client2->getSubscriber();
+
+ SimpleWaitCondition* cond1 = new SimpleWaitCondition();
+ std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
+ Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
+
+ SimpleWaitCondition* lcond1 = new SimpleWaitCondition();
+ std::auto_ptr<SimpleWaitCondition> lcond1ptr(lcond1);
+ Hedwig::SubscriptionListenerPtr listener1(new TestSubscriptionListener(lcond1));
+
+ Hedwig::SubscriptionOptions options;
+ options.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+ options.set_forceattach(true);
+ options.set_enableresubscribe(false);
+
+ sub1.addSubscriptionListener(listener1);
+
+ sub1.asyncSubscribe("asyncSubscribeForceAttach", "mysub",
+ options, testcb1);
+ cond1->wait();
+ ASSERT_TRUE(cond1->wasSuccess());
+
+ // sub2 subscribe would force close the channel of sub1
+ SimpleWaitCondition* cond2 = new SimpleWaitCondition();
+ std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2);
+ Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2));
+
+ Hedwig::SubscriptionListenerPtr listener2(new TestSubscriptionListener(0));
+
+ sub2.addSubscriptionListener(listener2);
+
+ sub2.asyncSubscribe("asyncSubscribeForceAttach", "mysub",
+ options, testcb2);
+ cond2->wait();
+ ASSERT_TRUE(cond2->wasSuccess());
+
+ // sub1 would receive the disconnect event
+ lcond1->wait();
+
+ sub1.unsubscribe("asyncSubscribeForceAttach", "mysub");
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h Fri Sep 14 16:16:17 2012
@@ -113,6 +113,26 @@ private:
SimpleWaitCondition *cond;
};
+class TestSubscriptionListener : public Hedwig::SubscriptionListener {
+public:
+ TestSubscriptionListener(SimpleWaitCondition* cond) : cond(cond) {
+ }
+
+ virtual ~TestSubscriptionListener() {}
+
+ virtual void processEvent(const std::string& topic, const std::string& subscriberId,
+ const Hedwig::SubscriptionEvent event) {
+ if (Hedwig::TOPIC_MOVED == event) {
+ if (cond) {
+ cond->setSuccess(true);
+ cond->notify();
+ }
+ }
+ }
+
+private:
+ SimpleWaitCondition *cond;
+};
class TestServerConfiguration : public Hedwig::Configuration {
public:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Fri Sep 14 16:16:17 2012
@@ -31,6 +31,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.SubscriptionListener;
/**
* Interface to define the client Subscriber API.
@@ -349,4 +350,21 @@ public interface Subscriber {
public void asyncCloseSubscription(ByteString topic, ByteString subscriberId, Callback<Void> callback,
Object context);
+ /**
+ * Register a subscription listener which get notified about subscription
+ * event indicating a state of a subscription that subscribed disable
+ * resubscribe logic.
+ *
+ * @param listener
+ * Subscription Listener
+ */
+ public void addSubscriptionListener(SubscriptionListener listener);
+
+ /**
+ * Unregister a subscription listener.
+ *
+ * @param listener
+ * Subscription Listener
+ */
+ public void removeSubscriptionListener(SubscriptionListener listener);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Fri Sep 14 16:16:17 2012
@@ -54,6 +54,10 @@ public class HedwigClientImpl implements
private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class);
+ // Empty Topic List
+ private ConcurrentLinkedQueue<ByteString> EMPTY_TOPIC_LIST =
+ new ConcurrentLinkedQueue<ByteString>();
+
// Global counter used for generating unique transaction ID's for
// publish and subscribe requests
protected final AtomicLong globalCounter = new AtomicLong();
@@ -373,6 +377,31 @@ public class HedwigClientImpl implements
}
}
+ // If a subscribe channel goes down, the topic might have moved.
+ // We only clear out that topic for the host and not all cached information.
+ public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clearing topic: " + topic.toStringUtf8() + " for host: "
+ + host);
+ }
+ if (topic2Host.remove(topic, host)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removed topic to host mapping for topic: " + topic.toStringUtf8()
+ + " and host: " + host);
+ }
+ }
+ ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
+ if (null != topicsForHost && topicsForHost.remove(topic)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removed topic: " + topic.toStringUtf8() + " from host: " + host);
+ }
+ if (topicsForHost.isEmpty()) {
+ // remove only topic list is empty
+ host2Topics.remove(host, EMPTY_TOPIC_LIST);
+ }
+ }
+ }
+
// Public getter to see if the client has been stopped.
public boolean hasStopped() {
return isStopped;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Fri Sep 14 16:16:17 2012
@@ -18,9 +18,12 @@
package org.apache.hedwig.client.netty;
import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.protocol.PubSubProtocol;
@@ -54,11 +57,13 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.SubscriptionListener;
/**
* This is the Hedwig Netty specific implementation of the Subscriber interface.
@@ -82,6 +87,8 @@ public class HedwigSubscriber implements
// user set when connection is recovered
protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler= new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
+ protected final CopyOnWriteArraySet<SubscriptionListener> listeners;
+
protected final HedwigClientImpl client;
protected final ClientConfiguration cfg;
private final Object closeLock = new Object();
@@ -90,6 +97,22 @@ public class HedwigSubscriber implements
public HedwigSubscriber(HedwigClientImpl client) {
this.client = client;
this.cfg = client.getConfiguration();
+ this.listeners = new CopyOnWriteArraySet<SubscriptionListener>();
+ }
+
+ public void addSubscriptionListener(SubscriptionListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeSubscriptionListener(SubscriptionListener listener) {
+ listeners.remove(listener);
+ }
+
+ void emitSubscriptionEvent(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event) {
+ for (SubscriptionListener listener : listeners) {
+ listener.processEvent(topic, subscriberId, event);
+ }
}
// Private method that holds the common logic for doing synchronous
@@ -424,6 +447,7 @@ public class HedwigSubscriber implements
SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
+ subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach());
// For now, all subscribes should wait for all cross-regional
// subscriptions to be established before returning.
subscribeRequestBuilder.setSynchronous(true);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Fri Sep 14 16:16:17 2012
@@ -47,7 +47,9 @@ import org.apache.hedwig.exceptions.PubS
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.util.SubscriptionListener;
@ChannelPipelineCoverage("all")
public class ResponseHandler extends SimpleChannelHandler {
@@ -301,7 +303,9 @@ public class ResponseHandler extends Sim
// Subscribe channel disconnected so first close and clear all
// cached Channel data set up for this topic subscription.
sub.closeSubscription(origSubData.topic, origSubData.subscriberId);
- client.clearAllTopicsForHost(host);
+ // a subscription channel disconnecteda because topic has moved.
+ // just clear the entry for the given topic
+ client.clearHostForTopic(origSubData.topic, host);
// Since the connection to the server host that was responsible
// for the topic died, we are not sure about the state of that
// server. Resend the original subscribe request data to the default
@@ -309,14 +313,24 @@ public class ResponseHandler extends Sim
// contacted or attempted to from this request as we are starting a
// "fresh" subscribe request.
origSubData.clearServersList();
- // Set a new type of VoidCallback for this async call. We need this
- // hook so after the subscribe reconnect has completed, delivery for
- // that topic subscriber should also be restarted (if it was that
- // case before the channel disconnect).
- origSubData.setCallback(new SubscribeReconnectCallback(origSubData, client));
- origSubData.context = null;
- logger.debug("Disconnected subscribe channel so reconnect with origSubData: {}", origSubData);
- client.doConnect(origSubData, cfg.getDefaultServerHost());
+ // do resubscribe if the subscription enables it
+ if (origSubData.options.getEnableResubscribe()) {
+ // Set a new type of VoidCallback for this async call. We need this
+ // hook so after the subscribe reconnect has completed, delivery for
+ // that topic subscriber should also be restarted (if it was that
+ // case before the channel disconnect).
+ origSubData.setCallback(new SubscribeReconnectCallback(origSubData, client));
+ origSubData.context = null;
+ // Clear the shouldClaim flag
+ origSubData.shouldClaim = false;
+ logger.debug("Disconnected subscribe channel so reconnect with origSubData: {}", origSubData);
+ client.doConnect(origSubData, cfg.getDefaultServerHost());
+ } else {
+ logger.info("Subscription channel for (topic:{}, subscriber:{}) is disconnected.",
+ origSubData.topic.toStringUtf8(), origSubData.subscriberId.toStringUtf8());
+ sub.emitSubscriptionEvent(origSubData.topic, origSubData.subscriberId,
+ SubscriptionEvent.TOPIC_MOVED);
+ }
}
// Finally, all of the PubSubRequests that are still waiting for an ack
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java?rev=1384836&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java Fri Sep 14 16:16:17 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.util;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+
+/**
+ * This class is used for subscriber to listen on subscription event.
+ */
+public interface SubscriptionListener {
+
+ /**
+ * Process an event from a subscription.
+ * <p>
+ * NOTE: It would be better to not run blocking operations in a
+ * listener implementation.
+ * </p>
+ *
+ * @param topic
+ * Topic Name
+ * @param subscriberId
+ * Subscriber Id
+ * @param event
+ * Event tell what happened to the subscription.
+ */
+ public void processEvent(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event);
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Fri Sep 14 16:16:17 2012
@@ -155,6 +155,75 @@ public final class PubSubProtocol {
// @@protoc_insertion_point(enum_scope:Hedwig.OperationType)
}
+ public enum SubscriptionEvent
+ implements com.google.protobuf.ProtocolMessageEnum {
+ TOPIC_MOVED(0, 1),
+ SUBSCRIPTION_FORCED_CLOSED(1, 2),
+ ;
+
+ public static final int TOPIC_MOVED_VALUE = 1;
+ public static final int SUBSCRIPTION_FORCED_CLOSED_VALUE = 2;
+
+
+ public final int getNumber() { return value; }
+
+ public static SubscriptionEvent valueOf(int value) {
+ switch (value) {
+ case 1: return TOPIC_MOVED;
+ case 2: return SUBSCRIPTION_FORCED_CLOSED;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap<SubscriptionEvent>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap<SubscriptionEvent>
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap<SubscriptionEvent>() {
+ public SubscriptionEvent findValueByNumber(int number) {
+ return SubscriptionEvent.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.hedwig.protocol.PubSubProtocol.getDescriptor().getEnumTypes().get(2);
+ }
+
+ private static final SubscriptionEvent[] VALUES = {
+ TOPIC_MOVED, SUBSCRIPTION_FORCED_CLOSED,
+ };
+
+ public static SubscriptionEvent valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private SubscriptionEvent(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:Hedwig.SubscriptionEvent)
+ }
+
public enum StatusCode
implements com.google.protobuf.ProtocolMessageEnum {
SUCCESS(0, 0),
@@ -251,7 +320,7 @@ public final class PubSubProtocol {
}
public static final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptor() {
- return org.apache.hedwig.protocol.PubSubProtocol.getDescriptor().getEnumTypes().get(2);
+ return org.apache.hedwig.protocol.PubSubProtocol.getDescriptor().getEnumTypes().get(3);
}
private static final StatusCode[] VALUES = {
@@ -6218,6 +6287,10 @@ public final class PubSubProtocol {
boolean hasPreferences();
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences getPreferences();
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder getPreferencesOrBuilder();
+
+ // optional bool forceAttach = 7 [default = false];
+ boolean hasForceAttach();
+ boolean getForceAttach();
}
public static final class SubscribeRequest extends
com.google.protobuf.GeneratedMessage
@@ -6373,12 +6446,23 @@ public final class PubSubProtocol {
return preferences_;
}
+ // optional bool forceAttach = 7 [default = false];
+ public static final int FORCEATTACH_FIELD_NUMBER = 7;
+ private boolean forceAttach_;
+ public boolean hasForceAttach() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public boolean getForceAttach() {
+ return forceAttach_;
+ }
+
private void initFields() {
subscriberId_ = com.google.protobuf.ByteString.EMPTY;
createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
synchronous_ = false;
messageBound_ = 0;
preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+ forceAttach_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -6411,6 +6495,9 @@ public final class PubSubProtocol {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(6, preferences_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBool(7, forceAttach_);
+ }
getUnknownFields().writeTo(output);
}
@@ -6440,6 +6527,10 @@ public final class PubSubProtocol {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(6, preferences_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(7, forceAttach_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -6579,6 +6670,8 @@ public final class PubSubProtocol {
preferencesBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000010);
+ forceAttach_ = false;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -6641,6 +6734,10 @@ public final class PubSubProtocol {
} else {
result.preferences_ = preferencesBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.forceAttach_ = forceAttach_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -6672,6 +6769,9 @@ public final class PubSubProtocol {
if (other.hasPreferences()) {
mergePreferences(other.getPreferences());
}
+ if (other.hasForceAttach()) {
+ setForceAttach(other.getForceAttach());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6742,6 +6842,11 @@ public final class PubSubProtocol {
setPreferences(subBuilder.buildPartial());
break;
}
+ case 56: {
+ bitField0_ |= 0x00000020;
+ forceAttach_ = input.readBool();
+ break;
+ }
}
}
}
@@ -6928,6 +7033,27 @@ public final class PubSubProtocol {
return preferencesBuilder_;
}
+ // optional bool forceAttach = 7 [default = false];
+ private boolean forceAttach_ ;
+ public boolean hasForceAttach() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public boolean getForceAttach() {
+ return forceAttach_;
+ }
+ public Builder setForceAttach(boolean value) {
+ bitField0_ |= 0x00000020;
+ forceAttach_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearForceAttach() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ forceAttach_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:Hedwig.SubscribeRequest)
}
@@ -6942,6 +7068,10 @@ public final class PubSubProtocol {
public interface SubscriptionOptionsOrBuilder
extends com.google.protobuf.MessageOrBuilder {
+ // optional bool forceAttach = 1 [default = false];
+ boolean hasForceAttach();
+ boolean getForceAttach();
+
// optional .Hedwig.SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
boolean hasCreateOrAttach();
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach getCreateOrAttach();
@@ -6958,6 +7088,10 @@ public final class PubSubProtocol {
// optional string messageFilter = 5;
boolean hasMessageFilter();
String getMessageFilter();
+
+ // optional bool enableResubscribe = 7 [default = true];
+ boolean hasEnableResubscribe();
+ boolean getEnableResubscribe();
}
public static final class SubscriptionOptions extends
com.google.protobuf.GeneratedMessage
@@ -6988,11 +7122,21 @@ public final class PubSubProtocol {
}
private int bitField0_;
+ // optional bool forceAttach = 1 [default = false];
+ public static final int FORCEATTACH_FIELD_NUMBER = 1;
+ private boolean forceAttach_;
+ public boolean hasForceAttach() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public boolean getForceAttach() {
+ return forceAttach_;
+ }
+
// optional .Hedwig.SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
public static final int CREATEORATTACH_FIELD_NUMBER = 2;
private org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach_;
public boolean hasCreateOrAttach() {
- return ((bitField0_ & 0x00000001) == 0x00000001);
+ return ((bitField0_ & 0x00000002) == 0x00000002);
}
public org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach getCreateOrAttach() {
return createOrAttach_;
@@ -7002,7 +7146,7 @@ public final class PubSubProtocol {
public static final int MESSAGEBOUND_FIELD_NUMBER = 3;
private int messageBound_;
public boolean hasMessageBound() {
- return ((bitField0_ & 0x00000002) == 0x00000002);
+ return ((bitField0_ & 0x00000004) == 0x00000004);
}
public int getMessageBound() {
return messageBound_;
@@ -7012,7 +7156,7 @@ public final class PubSubProtocol {
public static final int OPTIONS_FIELD_NUMBER = 4;
private org.apache.hedwig.protocol.PubSubProtocol.Map options_;
public boolean hasOptions() {
- return ((bitField0_ & 0x00000004) == 0x00000004);
+ return ((bitField0_ & 0x00000008) == 0x00000008);
}
public org.apache.hedwig.protocol.PubSubProtocol.Map getOptions() {
return options_;
@@ -7025,7 +7169,7 @@ public final class PubSubProtocol {
public static final int MESSAGEFILTER_FIELD_NUMBER = 5;
private java.lang.Object messageFilter_;
public boolean hasMessageFilter() {
- return ((bitField0_ & 0x00000008) == 0x00000008);
+ return ((bitField0_ & 0x00000010) == 0x00000010);
}
public String getMessageFilter() {
java.lang.Object ref = messageFilter_;
@@ -7053,11 +7197,23 @@ public final class PubSubProtocol {
}
}
+ // optional bool enableResubscribe = 7 [default = true];
+ public static final int ENABLERESUBSCRIBE_FIELD_NUMBER = 7;
+ private boolean enableResubscribe_;
+ public boolean hasEnableResubscribe() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public boolean getEnableResubscribe() {
+ return enableResubscribe_;
+ }
+
private void initFields() {
+ forceAttach_ = false;
createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
messageBound_ = 0;
options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
messageFilter_ = "";
+ enableResubscribe_ = true;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7072,17 +7228,23 @@ public final class PubSubProtocol {
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
- output.writeEnum(2, createOrAttach_.getNumber());
+ output.writeBool(1, forceAttach_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
- output.writeUInt32(3, messageBound_);
+ output.writeEnum(2, createOrAttach_.getNumber());
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
- output.writeMessage(4, options_);
+ output.writeUInt32(3, messageBound_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeMessage(4, options_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBytes(5, getMessageFilterBytes());
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBool(7, enableResubscribe_);
+ }
getUnknownFields().writeTo(output);
}
@@ -7094,20 +7256,28 @@ public final class PubSubProtocol {
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
- .computeEnumSize(2, createOrAttach_.getNumber());
+ .computeBoolSize(1, forceAttach_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
- .computeUInt32Size(3, messageBound_);
+ .computeEnumSize(2, createOrAttach_.getNumber());
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
- .computeMessageSize(4, options_);
+ .computeUInt32Size(3, messageBound_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(4, options_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
.computeBytesSize(5, getMessageFilterBytes());
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(7, enableResubscribe_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -7233,18 +7403,22 @@ public final class PubSubProtocol {
public Builder clear() {
super.clear();
- createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
+ forceAttach_ = false;
bitField0_ = (bitField0_ & ~0x00000001);
- messageBound_ = 0;
+ createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
bitField0_ = (bitField0_ & ~0x00000002);
+ messageBound_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000004);
if (optionsBuilder_ == null) {
options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
} else {
optionsBuilder_.clear();
}
- bitField0_ = (bitField0_ & ~0x00000004);
- messageFilter_ = "";
bitField0_ = (bitField0_ & ~0x00000008);
+ messageFilter_ = "";
+ bitField0_ = (bitField0_ & ~0x00000010);
+ enableResubscribe_ = true;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -7286,23 +7460,31 @@ public final class PubSubProtocol {
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
- result.createOrAttach_ = createOrAttach_;
+ result.forceAttach_ = forceAttach_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
- result.messageBound_ = messageBound_;
+ result.createOrAttach_ = createOrAttach_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
+ result.messageBound_ = messageBound_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
if (optionsBuilder_ == null) {
result.options_ = options_;
} else {
result.options_ = optionsBuilder_.build();
}
- if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
- to_bitField0_ |= 0x00000008;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
}
result.messageFilter_ = messageFilter_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.enableResubscribe_ = enableResubscribe_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -7319,6 +7501,9 @@ public final class PubSubProtocol {
public Builder mergeFrom(org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions other) {
if (other == org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.getDefaultInstance()) return this;
+ if (other.hasForceAttach()) {
+ setForceAttach(other.getForceAttach());
+ }
if (other.hasCreateOrAttach()) {
setCreateOrAttach(other.getCreateOrAttach());
}
@@ -7331,6 +7516,9 @@ public final class PubSubProtocol {
if (other.hasMessageFilter()) {
setMessageFilter(other.getMessageFilter());
}
+ if (other.hasEnableResubscribe()) {
+ setEnableResubscribe(other.getEnableResubscribe());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -7362,19 +7550,24 @@ public final class PubSubProtocol {
}
break;
}
+ case 8: {
+ bitField0_ |= 0x00000001;
+ forceAttach_ = input.readBool();
+ break;
+ }
case 16: {
int rawValue = input.readEnum();
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach value = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.valueOf(rawValue);
if (value == null) {
unknownFields.mergeVarintField(2, rawValue);
} else {
- bitField0_ |= 0x00000001;
+ bitField0_ |= 0x00000002;
createOrAttach_ = value;
}
break;
}
case 24: {
- bitField0_ |= 0x00000002;
+ bitField0_ |= 0x00000004;
messageBound_ = input.readUInt32();
break;
}
@@ -7388,20 +7581,46 @@ public final class PubSubProtocol {
break;
}
case 42: {
- bitField0_ |= 0x00000008;
+ bitField0_ |= 0x00000010;
messageFilter_ = input.readBytes();
break;
}
+ case 56: {
+ bitField0_ |= 0x00000020;
+ enableResubscribe_ = input.readBool();
+ break;
+ }
}
}
}
private int bitField0_;
+ // optional bool forceAttach = 1 [default = false];
+ private boolean forceAttach_ ;
+ public boolean hasForceAttach() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public boolean getForceAttach() {
+ return forceAttach_;
+ }
+ public Builder setForceAttach(boolean value) {
+ bitField0_ |= 0x00000001;
+ forceAttach_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearForceAttach() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ forceAttach_ = false;
+ onChanged();
+ return this;
+ }
+
// optional .Hedwig.SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
private org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
public boolean hasCreateOrAttach() {
- return ((bitField0_ & 0x00000001) == 0x00000001);
+ return ((bitField0_ & 0x00000002) == 0x00000002);
}
public org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach getCreateOrAttach() {
return createOrAttach_;
@@ -7410,13 +7629,13 @@ public final class PubSubProtocol {
if (value == null) {
throw new NullPointerException();
}
- bitField0_ |= 0x00000001;
+ bitField0_ |= 0x00000002;
createOrAttach_ = value;
onChanged();
return this;
}
public Builder clearCreateOrAttach() {
- bitField0_ = (bitField0_ & ~0x00000001);
+ bitField0_ = (bitField0_ & ~0x00000002);
createOrAttach_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH;
onChanged();
return this;
@@ -7425,19 +7644,19 @@ public final class PubSubProtocol {
// optional uint32 messageBound = 3 [default = 0];
private int messageBound_ ;
public boolean hasMessageBound() {
- return ((bitField0_ & 0x00000002) == 0x00000002);
+ return ((bitField0_ & 0x00000004) == 0x00000004);
}
public int getMessageBound() {
return messageBound_;
}
public Builder setMessageBound(int value) {
- bitField0_ |= 0x00000002;
+ bitField0_ |= 0x00000004;
messageBound_ = value;
onChanged();
return this;
}
public Builder clearMessageBound() {
- bitField0_ = (bitField0_ & ~0x00000002);
+ bitField0_ = (bitField0_ & ~0x00000004);
messageBound_ = 0;
onChanged();
return this;
@@ -7448,7 +7667,7 @@ public final class PubSubProtocol {
private com.google.protobuf.SingleFieldBuilder<
org.apache.hedwig.protocol.PubSubProtocol.Map, org.apache.hedwig.protocol.PubSubProtocol.Map.Builder, org.apache.hedwig.protocol.PubSubProtocol.MapOrBuilder> optionsBuilder_;
public boolean hasOptions() {
- return ((bitField0_ & 0x00000004) == 0x00000004);
+ return ((bitField0_ & 0x00000008) == 0x00000008);
}
public org.apache.hedwig.protocol.PubSubProtocol.Map getOptions() {
if (optionsBuilder_ == null) {
@@ -7467,7 +7686,7 @@ public final class PubSubProtocol {
} else {
optionsBuilder_.setMessage(value);
}
- bitField0_ |= 0x00000004;
+ bitField0_ |= 0x00000008;
return this;
}
public Builder setOptions(
@@ -7478,12 +7697,12 @@ public final class PubSubProtocol {
} else {
optionsBuilder_.setMessage(builderForValue.build());
}
- bitField0_ |= 0x00000004;
+ bitField0_ |= 0x00000008;
return this;
}
public Builder mergeOptions(org.apache.hedwig.protocol.PubSubProtocol.Map value) {
if (optionsBuilder_ == null) {
- if (((bitField0_ & 0x00000004) == 0x00000004) &&
+ if (((bitField0_ & 0x00000008) == 0x00000008) &&
options_ != org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance()) {
options_ =
org.apache.hedwig.protocol.PubSubProtocol.Map.newBuilder(options_).mergeFrom(value).buildPartial();
@@ -7494,7 +7713,7 @@ public final class PubSubProtocol {
} else {
optionsBuilder_.mergeFrom(value);
}
- bitField0_ |= 0x00000004;
+ bitField0_ |= 0x00000008;
return this;
}
public Builder clearOptions() {
@@ -7504,11 +7723,11 @@ public final class PubSubProtocol {
} else {
optionsBuilder_.clear();
}
- bitField0_ = (bitField0_ & ~0x00000004);
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
public org.apache.hedwig.protocol.PubSubProtocol.Map.Builder getOptionsBuilder() {
- bitField0_ |= 0x00000004;
+ bitField0_ |= 0x00000008;
onChanged();
return getOptionsFieldBuilder().getBuilder();
}
@@ -7536,7 +7755,7 @@ public final class PubSubProtocol {
// optional string messageFilter = 5;
private java.lang.Object messageFilter_ = "";
public boolean hasMessageFilter() {
- return ((bitField0_ & 0x00000008) == 0x00000008);
+ return ((bitField0_ & 0x00000010) == 0x00000010);
}
public String getMessageFilter() {
java.lang.Object ref = messageFilter_;
@@ -7552,23 +7771,44 @@ public final class PubSubProtocol {
if (value == null) {
throw new NullPointerException();
}
- bitField0_ |= 0x00000008;
+ bitField0_ |= 0x00000010;
messageFilter_ = value;
onChanged();
return this;
}
public Builder clearMessageFilter() {
- bitField0_ = (bitField0_ & ~0x00000008);
+ bitField0_ = (bitField0_ & ~0x00000010);
messageFilter_ = getDefaultInstance().getMessageFilter();
onChanged();
return this;
}
void setMessageFilter(com.google.protobuf.ByteString value) {
- bitField0_ |= 0x00000008;
+ bitField0_ |= 0x00000010;
messageFilter_ = value;
onChanged();
}
+ // optional bool enableResubscribe = 7 [default = true];
+ private boolean enableResubscribe_ = true;
+ public boolean hasEnableResubscribe() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public boolean getEnableResubscribe() {
+ return enableResubscribe_;
+ }
+ public Builder setEnableResubscribe(boolean value) {
+ bitField0_ |= 0x00000020;
+ enableResubscribe_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearEnableResubscribe() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ enableResubscribe_ = true;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:Hedwig.SubscriptionOptions)
}
@@ -15144,67 +15384,71 @@ public final class PubSubProtocol {
"yRequest\".\n\016PublishRequest\022\034\n\003msg\030\002 \002(\0132" +
"\017.Hedwig.Message\"d\n\027SubscriptionPreferen" +
"ces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014mes" +
- "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\"\243" +
+ "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\"\277" +
"\002\n\020SubscribeRequest\022\024\n\014subscriberId\030\002 \002(" +
"\014\022Q\n\016createOrAttach\030\003 \001(\0162\'.Hedwig.Subsc",
"ribeRequest.CreateOrAttach:\020CREATE_OR_AT" +
"TACH\022\032\n\013synchronous\030\004 \001(\010:\005false\022\024\n\014mess" +
"ageBound\030\005 \001(\r\0224\n\013preferences\030\006 \001(\0132\037.He" +
- "dwig.SubscriptionPreferences\">\n\016CreateOr" +
- "Attach\022\n\n\006CREATE\020\000\022\n\n\006ATTACH\020\001\022\024\n\020CREATE" +
- "_OR_ATTACH\020\002\"\266\001\n\023SubscriptionOptions\022Q\n\016" +
- "createOrAttach\030\002 \001(\0162\'.Hedwig.SubscribeR" +
- "equest.CreateOrAttach:\020CREATE_OR_ATTACH\022" +
- "\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007options\030\004 \001(" +
- "\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030\005 \001(\t\"K\n",
- "\016ConsumeRequest\022\024\n\014subscriberId\030\002 \002(\014\022#\n" +
- "\005msgId\030\003 \002(\0132\024.Hedwig.MessageSeqId\"*\n\022Un" +
- "subscribeRequest\022\024\n\014subscriberId\030\002 \002(\014\"+" +
- "\n\023StopDeliveryRequest\022\024\n\014subscriberId\030\002 " +
- "\002(\014\",\n\024StartDeliveryRequest\022\024\n\014subscribe" +
- "rId\030\002 \002(\014\"\377\001\n\016PubSubResponse\0220\n\017protocol" +
- "Version\030\001 \002(\0162\027.Hedwig.ProtocolVersion\022&" +
- "\n\nstatusCode\030\002 \002(\0162\022.Hedwig.StatusCode\022\r" +
- "\n\005txnId\030\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(\t\022 \n\007mess" +
- "age\030\005 \001(\0132\017.Hedwig.Message\022\r\n\005topic\030\006 \001(",
- "\014\022\024\n\014subscriberId\030\007 \001(\014\022*\n\014responseBody\030" +
- "\010 \001(\0132\024.Hedwig.ResponseBody\"?\n\017PublishRe" +
- "sponse\022,\n\016publishedMsgId\030\001 \002(\0132\024.Hedwig." +
- "MessageSeqId\"I\n\021SubscribeResponse\0224\n\013pre" +
- "ferences\030\002 \001(\0132\037.Hedwig.SubscriptionPref" +
- "erences\"v\n\014ResponseBody\0220\n\017publishRespon" +
- "se\030\001 \001(\0132\027.Hedwig.PublishResponse\0224\n\021sub" +
- "scribeResponse\030\002 \001(\0132\031.Hedwig.SubscribeR" +
- "esponse\"N\n\021SubscriptionState\022#\n\005msgId\030\001 " +
- "\002(\0132\024.Hedwig.MessageSeqId\022\024\n\014messageBoun",
- "d\030\002 \001(\r\"r\n\020SubscriptionData\022(\n\005state\030\001 \001" +
- "(\0132\031.Hedwig.SubscriptionState\0224\n\013prefere" +
- "nces\030\002 \001(\0132\037.Hedwig.SubscriptionPreferen" +
- "ces\"O\n\013LedgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020" +
- "endSeqIdIncluded\030\002 \001(\0132\024.Hedwig.MessageS" +
- "eqId\"3\n\014LedgerRanges\022#\n\006ranges\030\001 \003(\0132\023.H" +
- "edwig.LedgerRange\":\n\013ManagerMeta\022\023\n\013mana" +
- "gerImpl\030\002 \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n" +
- "\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030" +
- "\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*",
- "\"\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020\001*p\n\rO" +
- "perationType\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001" +
- "\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_" +
- "DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*\205\004\n\nStatus" +
- "Code\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221" +
- "\003\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_" +
- "SUBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224" +
- "\003\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020" +
- "\226\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SE" +
- "RVICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026I",
- "NVALID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020" +
- "\210\004\022\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TO" +
- "PIC_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUB" +
- "SCRIPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE" +
- "_EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027" +
- "TOPIC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED" +
- "_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apa" +
- "che.hedwig.protocolH\001"
+ "dwig.SubscriptionPreferences\022\032\n\013forceAtt" +
+ "ach\030\007 \001(\010:\005false\">\n\016CreateOrAttach\022\n\n\006CR" +
+ "EATE\020\000\022\n\n\006ATTACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002" +
+ "\"\363\001\n\023SubscriptionOptions\022\032\n\013forceAttach\030" +
+ "\001 \001(\010:\005false\022Q\n\016createOrAttach\030\002 \001(\0162\'.H" +
+ "edwig.SubscribeRequest.CreateOrAttach:\020C" +
+ "REATE_OR_ATTACH\022\027\n\014messageBound\030\003 \001(\r:\0010",
+ "\022\034\n\007options\030\004 \001(\0132\013.Hedwig.Map\022\025\n\rmessag" +
+ "eFilter\030\005 \001(\t\022\037\n\021enableResubscribe\030\007 \001(\010" +
+ ":\004true\"K\n\016ConsumeRequest\022\024\n\014subscriberId" +
+ "\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.Hedwig.MessageSe" +
+ "qId\"*\n\022UnsubscribeRequest\022\024\n\014subscriberI" +
+ "d\030\002 \002(\014\"+\n\023StopDeliveryRequest\022\024\n\014subscr" +
+ "iberId\030\002 \002(\014\",\n\024StartDeliveryRequest\022\024\n\014" +
+ "subscriberId\030\002 \002(\014\"\377\001\n\016PubSubResponse\0220\n" +
+ "\017protocolVersion\030\001 \002(\0162\027.Hedwig.Protocol" +
+ "Version\022&\n\nstatusCode\030\002 \002(\0162\022.Hedwig.Sta",
+ "tusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(" +
+ "\t\022 \n\007message\030\005 \001(\0132\017.Hedwig.Message\022\r\n\005t" +
+ "opic\030\006 \001(\014\022\024\n\014subscriberId\030\007 \001(\014\022*\n\014resp" +
+ "onseBody\030\010 \001(\0132\024.Hedwig.ResponseBody\"?\n\017" +
+ "PublishResponse\022,\n\016publishedMsgId\030\001 \002(\0132" +
+ "\024.Hedwig.MessageSeqId\"I\n\021SubscribeRespon" +
+ "se\0224\n\013preferences\030\002 \001(\0132\037.Hedwig.Subscri" +
+ "ptionPreferences\"v\n\014ResponseBody\0220\n\017publ" +
+ "ishResponse\030\001 \001(\0132\027.Hedwig.PublishRespon" +
+ "se\0224\n\021subscribeResponse\030\002 \001(\0132\031.Hedwig.S",
+ "ubscribeResponse\"N\n\021SubscriptionState\022#\n" +
+ "\005msgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\022\024\n\014me" +
+ "ssageBound\030\002 \001(\r\"r\n\020SubscriptionData\022(\n\005" +
+ "state\030\001 \001(\0132\031.Hedwig.SubscriptionState\0224" +
+ "\n\013preferences\030\002 \001(\0132\037.Hedwig.Subscriptio" +
+ "nPreferences\"O\n\013LedgerRange\022\020\n\010ledgerId\030" +
+ "\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 \001(\0132\024.Hedwig" +
+ ".MessageSeqId\"3\n\014LedgerRanges\022#\n\006ranges\030" +
+ "\001 \003(\0132\023.Hedwig.LedgerRange\":\n\013ManagerMet" +
+ "a\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016managerVersion",
+ "\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022" +
+ "\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopi" +
+ "cs\030\002 \002(\004*\"\n\017ProtocolVersion\022\017\n\013VERSION_O" +
+ "NE\020\001*p\n\rOperationType\022\013\n\007PUBLISH\020\000\022\r\n\tSU" +
+ "BSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022" +
+ "\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*D" +
+ "\n\021SubscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032" +
+ "SUBSCRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusC" +
+ "ode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003" +
+ "\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_S",
+ "UBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003" +
+ "\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226" +
+ "\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SER" +
+ "VICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026IN" +
+ "VALID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210" +
+ "\004\022\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOP" +
+ "IC_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBS" +
+ "CRIPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_" +
+ "EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027T" +
+ "OPIC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_",
+ "CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apac" +
+ "he.hedwig.protocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15288,7 +15532,7 @@ public final class PubSubProtocol {
internal_static_Hedwig_SubscribeRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscribeRequest_descriptor,
- new java.lang.String[] { "SubscriberId", "CreateOrAttach", "Synchronous", "MessageBound", "Preferences", },
+ new java.lang.String[] { "SubscriberId", "CreateOrAttach", "Synchronous", "MessageBound", "Preferences", "ForceAttach", },
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.Builder.class);
internal_static_Hedwig_SubscriptionOptions_descriptor =
@@ -15296,7 +15540,7 @@ public final class PubSubProtocol {
internal_static_Hedwig_SubscriptionOptions_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionOptions_descriptor,
- new java.lang.String[] { "CreateOrAttach", "MessageBound", "Options", "MessageFilter", },
+ new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "EnableResubscribe", },
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.Builder.class);
internal_static_Hedwig_ConsumeRequest_descriptor =
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Fri Sep 14 16:16:17 2012
@@ -136,17 +136,26 @@ message SubscribeRequest{
// subscription options
optional SubscriptionPreferences preferences = 6;
+
+ // force attach subscription which would kill existed channel
+ // this option doesn't need to be persisted
+ optional bool forceAttach = 7 [default = false];
}
// used in client only
// options are stored in SubscriptionPreferences structure
message SubscriptionOptions {
+ // force attach subscription which would kill existed channel
+ // this option doesn't need to be persisted
+ optional bool forceAttach = 1 [default = false];
optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
optional uint32 messageBound = 3 [default = 0];
// user customized subscription options
optional Map options = 4;
// server-side message filter
optional string messageFilter = 5;
+ // enable resubscribe
+ optional bool enableResubscribe = 7 [default = true];
}
message ConsumeRequest{
@@ -168,6 +177,14 @@ message StartDeliveryRequest{
required bytes subscriberId = 2;
}
+// Identify an event happened for a subscription
+enum SubscriptionEvent {
+ // topic has changed ownership (hub server down or topic released)
+ TOPIC_MOVED = 1;
+ // subscription is force closed by other subscribers
+ SUBSCRIPTION_FORCED_CLOSED = 2;
+}
+
message PubSubResponse{
required ProtocolVersion protocolVersion = 1;
required StatusCode statusCode = 2;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Fri Sep 14 16:16:17 2012
@@ -50,11 +50,14 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.topics.HubInfo;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.util.SubscriptionListener;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -238,6 +241,15 @@ public class HedwigConsole {
}
+ static class ConsoleSubscriptionListener implements SubscriptionListener {
+
+ @Override
+ public void processEvent(ByteString t, ByteString s, SubscriptionEvent event) {
+ System.out.println("Subscription Channel for (topic:" + t.toStringUtf8() + ", subscriber:"
+ + s.toStringUtf8() + ") received event : " + event);
+ }
+ }
+
class SubCmd implements MyCommand {
@Override
@@ -271,7 +283,10 @@ public class HedwigConsole {
ByteString topic = ByteString.copyFromUtf8(args[1]);
ByteString subId = ByteString.copyFromUtf8(args[2]);
try {
- subscriber.subscribe(topic, subId, mode);
+ SubscriptionOptions options =
+ SubscriptionOptions.newBuilder().setCreateOrAttach(mode)
+ .setForceAttach(false).build();
+ subscriber.subscribe(topic, subId, options);
if (receive) {
subscriber.startDelivery(topic, subId, consoleHandler);
System.out.println("SUB DONE AND RECEIVE");
@@ -400,6 +415,7 @@ public class HedwigConsole {
+ " subscriber: " + args[2]);
return true;
}
+ lastConsumedId = subData.getState().getMsgId().getLocalComponent();
long numMessagesToConsume = Long.parseLong(args[3]);
long idToConsumed = lastConsumedId + numMessagesToConsume;
System.out.println("Try to move subscriber(" + args[2] + ") consume ptr of topic(" + args[1]
@@ -852,6 +868,7 @@ public class HedwigConsole {
hubClient = new HedwigClient(hubClientCfg);
publisher = hubClient.getPublisher();
subscriber = hubClient.getSubscriber();
+ subscriber.addSubscriptionListener(new ConsoleSubscriptionListener());
// other parameters
myRegion = hubServerConf.getMyRegion();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Fri Sep 14 16:16:17 2012
@@ -314,6 +314,10 @@ public class FIFODeliveryManager impleme
}
public void setNotConnected() {
+ // have closed it.
+ if (!isConnected()) {
+ return;
+ }
this.connected = false;
deliveryEndPoint.close();
// uninitialize filter
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Fri Sep 14 16:16:17 2012
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHa
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import com.google.protobuf.ByteString;
@@ -65,6 +66,17 @@ public class SubscribeHandler extends Ba
// op stats
private final OpStats subStats;
+ private static ChannelFutureListener CLOSE_OLD_CHANNEL_LISTENER = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.warn("Failed to close old subscription channel.");
+ } else {
+ logger.debug("Close old subscription channel succeed.");
+ }
+ }
+ };
+
public SubscribeHandler(TopicManager topicMgr, DeliveryManager deliveryManager, PersistenceManager persistenceMgr,
SubscriptionManager subMgr, ServerConfiguration cfg) {
super(topicMgr, cfg);
@@ -83,7 +95,8 @@ public class SubscribeHandler extends Ba
synchronized (channel) {
TopicSubscriber topicSub = channel2sub.remove(channel);
if (topicSub != null) {
- sub2Channel.remove(topicSub);
+ // remove entry only currently mapped to given value.
+ sub2Channel.remove(topicSub, channel);
}
}
}
@@ -106,6 +119,8 @@ public class SubscribeHandler extends Ba
} catch (ServerNotResponsibleForTopicException e) {
channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId())).addListener(
ChannelFutureListener.CLOSE);
+ logger.error("Error getting current seq id for topic " + topic.toStringUtf8()
+ + " when processing subscribe request (txnid:" + request.getTxnId() + ") :", e);
subStats.incrementFailedOps();
ServerStats.getInstance().incrementRequestsRedirect();
return;
@@ -123,6 +138,8 @@ public class SubscribeHandler extends Ba
public void operationFailed(Object ctx, PubSubException exception) {
channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())).addListener(
ChannelFutureListener.CLOSE);
+ logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: "
+ + topic.toStringUtf8() + " , subscriber: " + subscriberId.toStringUtf8() + ")", exception);
subStats.incrementFailedOps();
}
@@ -130,9 +147,6 @@ public class SubscribeHandler extends Ba
public void operationFinished(Object ctx, SubscriptionData subData) {
TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
-
- // race with channel getting disconnected while we are adding it
- // to the 2 maps
synchronized (channel) {
if (!channel.isConnected()) {
// channel got disconnected while we were processing the
@@ -141,20 +155,6 @@ public class SubscribeHandler extends Ba
subStats.incrementFailedOps();
return;
}
-
- if (null != sub2Channel.putIfAbsent(topicSub, channel)) {
- // there was another channel mapped to this sub
- PubSubException pse = new PubSubException.TopicBusyException(
- "subscription for this topic, subscriberId is already being served on a different channel");
- channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
- .addListener(ChannelFutureListener.CLOSE);
- subStats.incrementFailedOps();
- return;
- } else {
- // channel2sub is just a cache, so we can add to it
- // without synchronization
- channel2sub.put(channel, topicSub);
- }
}
// initialize the message filter
PipelineFilter filter = new PipelineFilter();
@@ -192,7 +192,57 @@ public class SubscribeHandler extends Ba
.addListener(ChannelFutureListener.CLOSE);
return;
}
+ // race with channel getting disconnected while we are adding it
+ // to the 2 maps
+ synchronized (channel) {
+ boolean forceAttach = false;
+ if (subRequest.hasForceAttach()) {
+ forceAttach = subRequest.getForceAttach();
+ }
+ Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
+ if (null != oldChannel) {
+ boolean subSuccess = false;
+ if (forceAttach) {
+ // it is safe to close old channel here since new channel will be put
+ // in sub2Channel / channel2Sub so there is no race between channel
+ // getting disconnected and it.
+ ChannelFuture future = oldChannel.close();
+ future.addListener(CLOSE_OLD_CHANNEL_LISTENER);
+ logger.info("New subscribe request (" + request.getTxnId() + ") for (topic: " + topic.toStringUtf8()
+ + ", subscriber: " + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
+ + " kills old channel " + oldChannel.getRemoteAddress());
+ // try replace the oldChannel
+ // if replace failure, it migth caused because channelDisconnect callback
+ // has removed the old channel.
+ if (!sub2Channel.replace(topicSub, oldChannel, channel)) {
+ // try to add it now.
+ // if add failure, it means other one has obtained the channel
+ oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
+ if (null == oldChannel) {
+ subSuccess = true;
+ }
+ } else {
+ subSuccess = true;
+ }
+ }
+ if (!subSuccess) {
+ PubSubException pse = new PubSubException.TopicBusyException(
+ "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8()
+ + " is already being served on a different channel " + oldChannel.getRemoteAddress());
+ subStats.incrementFailedOps();
+ logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: " + topic.toStringUtf8()
+ + ", subscriber: " + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
+ + " since it already being served on a different channel " + oldChannel.getRemoteAddress());
+ channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
+ .addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ }
+ // channel2sub is just a cache, so we can add to it
+ // without synchronization
+ channel2sub.put(channel, topicSub);
+ }
// First write success and then tell the delivery manager,
// otherwise the first message might go out before the response
// to the subscribe