You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/14 12:29:04 UTC

svn commit: r1300510 [1/3] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/ hedwig-client/src/main/cpp/test/ hedwig-client/src/ma...

Author: ivank
Date: Wed Mar 14 11:29:03 2012
New Revision: 1300510

URL: http://svn.apache.org/viewvc?rev=1300510&view=rev
Log:
BOOKKEEPER-168: Message bounding on subscriptions (ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
Removed:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/log4cpp.conf
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Mar 14 11:29:03 2012
@@ -86,6 +86,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
 
+        BOOKKEEPER-168: Message bounding on subscriptions (ivank)
+
       bookkeeper-benchmark/
 	BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h Wed Mar 14 11:29:03 2012
@@ -47,6 +47,15 @@ namespace Hedwig {
     static const std::string SYNC_REQUEST_TIMEOUT;
     static const std::string SUBSCRIBER_AUTOCONSUME;
     static const std::string NUM_DISPATCH_THREADS;
+    /**
+     * The maximum number of messages the hub will queue for subscriptions
+     * created using this configuration. The hub will always queue the most
+     * recent messages. If there are enough publishes to the topic to hit
+     * the bound, then the oldest messages are dropped from the queue.
+     *
+     * A bound of 0 disabled the bound completely.
+     */
+    static const std::string SUBSCRIPTION_MESSAGE_BOUND;
 
   public:
     Configuration() {};

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h Wed Mar 14 11:29:03 2012
@@ -34,6 +34,8 @@ namespace Hedwig {
   public:
     virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) = 0;
     virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) = 0;
+    virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) = 0;
+    virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) = 0;
     
     virtual void unsubscribe(const std::string& topic, const std::string& subscriberId) = 0;
     virtual void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) = 0;  

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp Wed Mar 14 11:29:03 2012
@@ -38,6 +38,7 @@ const std::string Configuration::RECONNE
 const std::string Configuration::SYNC_REQUEST_TIMEOUT = "hedwig.cpp.sync_request_timeout";
 const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume";
 const std::string Configuration::NUM_DISPATCH_THREADS = "hedwig.cpp.num_dispatch_threads";
+const std::string Configuration::SUBSCRIPTION_MESSAGE_BOUND = "hedwig.cpp.subscription_message_bound";
 
 Client::Client(const Configuration& conf) {
   LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")");

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp Wed Mar 14 11:29:03 2012
@@ -38,14 +38,14 @@ PubSubDataPtr PubSubData::forPublishRequ
   return ptr;
 }
 
-PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, SubscribeRequest::CreateOrAttach mode) {
+PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, const SubscriptionOptions& options) {
   PubSubDataPtr ptr(new PubSubData());
   ptr->type = SUBSCRIBE;
   ptr->txnid = txnid;
   ptr->subscriberid = subscriberid;
   ptr->topic = topic;
   ptr->callback = callback;
-  ptr->mode = mode;
+  ptr->options = options;
   return ptr;  
 }
 
@@ -69,7 +69,7 @@ PubSubDataPtr PubSubData::forConsumeRequ
   return ptr;  
 }
 
-PubSubData::PubSubData() : shouldClaim(false) {  
+PubSubData::PubSubData() : shouldClaim(false), messageBound(0) {  
 }
 
 PubSubData::~PubSubData() {
@@ -116,7 +116,10 @@ const PubSubRequestPtr PubSubData::getRe
 
     Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
     subreq->set_subscriberid(subscriberid);
-    subreq->set_createorattach(mode);
+    subreq->set_createorattach(options.createorattach());
+    if (options.messagebound() > 0) {
+      subreq->set_messagebound(options.messagebound());
+    }
   } else if (type == CONSUME) {
     LOG4CXX_DEBUG(logger, "Creating consume request");
 
@@ -164,6 +167,6 @@ const std::string& PubSubData::getSubscr
   return subscriberid;
 }
 
-SubscribeRequest::CreateOrAttach PubSubData::getMode() const {
-  return mode;
+const SubscriptionOptions& PubSubData::getSubscriptionOptions() const {
+  return options;
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h Wed Mar 14 11:29:03 2012
@@ -62,7 +62,8 @@ namespace Hedwig {
   public:
     // to be used for publish
     static PubSubDataPtr forPublishRequest(long txnid, const std::string& topic, const std::string& body, const OperationCallbackPtr& callback);
-    static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, SubscribeRequest::CreateOrAttach mode);
+    static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
+					     const OperationCallbackPtr& callback, const SubscriptionOptions& options);
     static PubSubDataPtr forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback);
     static PubSubDataPtr forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid);
 
@@ -76,11 +77,12 @@ namespace Hedwig {
     const MessageSeqId getMessageSeqId() const;
 
     void setShouldClaim(bool shouldClaim);
+    void setMessageBound(int messageBound);
 
     const PubSubRequestPtr getRequest();
     void setCallback(const OperationCallbackPtr& callback);
     OperationCallbackPtr& getCallback();
-    SubscribeRequest::CreateOrAttach getMode() const;
+    const SubscriptionOptions& getSubscriptionOptions() const;
 
     void addTriedServer(HostAddress& h);
     bool hasTriedServer(HostAddress& h);
@@ -95,8 +97,9 @@ namespace Hedwig {
     std::string topic;
     std::string body;
     bool shouldClaim;
+    int messageBound;
     OperationCallbackPtr callback;
-    SubscribeRequest::CreateOrAttach mode;
+    SubscriptionOptions options;
     MessageSeqId msgid;
     std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers;
   };

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Wed Mar 14 11:29:03 2012
@@ -36,6 +36,7 @@ const int DEFAULT_SUBSCRIBER_CONSUME_RET
 const int DEFAULT_MAX_MESSAGE_QUEUE_SIZE = 10;
 const int DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = 5000;
 const bool DEFAULT_SUBSCRIBER_AUTOCONSUME = true;
+const int DEFAULT_SUBSCRIPTION_MESSAGE_BOUND = 0;
 
 SubscriberWriteCallback::SubscriberWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
 
@@ -298,17 +299,37 @@ SubscriberImpl::~SubscriberImpl() 
 
 
 void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) {
+  SubscriptionOptions options;
+  options.set_createorattach(mode);
+  subscribe(topic, subscriberId, options);
+}
+
+void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) {
   SyncOperationCallback* cb = new SyncOperationCallback(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT, 
 											  DEFAULT_SYNC_REQUEST_TIMEOUT));
   OperationCallbackPtr callback(cb);
-  asyncSubscribe(topic, subscriberId, mode, callback);
+  asyncSubscribe(topic, subscriberId, options, callback);
   cb->wait();
   
   cb->throwExceptionIfNeeded();  
 }
 
 void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) {
-  PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId, topic, callback, mode);
+  SubscriptionOptions options;
+  options.set_createorattach(mode);
+  asyncSubscribe(topic, subscriberId, options, callback);
+}
+
+void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) {
+  SubscriptionOptions options2 = options;
+
+  if (!options2.has_messagebound()) {
+    int messageBound = client->getConfiguration().getInt(Configuration::SUBSCRIPTION_MESSAGE_BOUND,
+							 DEFAULT_SUBSCRIPTION_MESSAGE_BOUND);
+    options2.set_messagebound(messageBound);
+  }
+
+  PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId, topic, callback, options2);
 
   SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, *this, data));
   ChannelHandlerPtr baseptr = handler;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Wed Mar 14 11:29:03 2012
@@ -145,6 +145,8 @@ namespace Hedwig {
 
     void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode);
     void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback);
+    void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options);
+    void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback);
     
     void unsubscribe(const std::string& topic, const std::string& subscriberId);
     void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback);

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf Wed Mar 14 11:29:03 2012
@@ -38,12 +38,12 @@ log4j.appender.hedwigtest.layout=org.apa
 log4j.appender.hedwigtest.layout.ConversionPattern=%.5m%n
 
 # category
-log4j.category.hedwig=DEBUG, hedwig
-log4j.rootCategory=DEBUG
+log4j.category.hedwig=INFO, hedwig
+log4j.rootCategory=INFO
 
 #log4j.category.hedwig.channel=ERROR
 log4j.category.hedwig.util=ERROR
 log4j.category.hedwigtest.servercontrol=ERROR
 
-log4j.category.hedwigtest=DEBUG, hedwigtest
-log4j.rootCategory=DEBUG
+log4j.category.hedwigtest=INFO, hedwigtest
+log4j.rootCategory=INFO

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh Wed Mar 14 11:29:03 2012
@@ -19,6 +19,8 @@
 
 cd `dirname $0`;
 
+export LOG4CXX_CONF=`pwd`/log4cxx.conf
+
 source network-delays.sh
 source server-control.sh
 
@@ -57,6 +59,41 @@ EOF
     exit $RESULT
 }
 
+singletest() {
+    if [ "z$HEDWIG_NETWORK_DELAY" != "z" ]; then
+	setup_delays $HEDWIG_NETWORK_DELAY
+    fi
+
+    stop_cluster;
+    start_cluster;
+
+    ../test/hedwigtest $1
+    RESULT=$?
+    stop_cluster;
+
+    if [ "z$HEDWIG_NETWORK_DELAY" != "z" ]; then
+	clear_delays
+    else
+	cat <<EOF
+
+The environment variable HEDWIG_NETWORK_DELAY is not set, so the tests were run directly 
+with a localhost server. This isn't quite realistic as usually there will be some delay between 
+the client and the hedwig server. Set HEDWIG_NETWORK_DELAY to the number of milliseconds you want
+to delay the packets between the server and client. 
+
+ $ export HEDWIG_NETWORK_DELAY=500
+
+Requires root privileges.
+
+WARNING!!! This will modify your traffic shaping and firewall rules. If you do run with delays, 
+check your firewall rules afterwards.
+
+EOF
+    fi
+
+    exit $RESULT
+}
+
 case "$1" in
     start-cluster)
 	start_cluster
@@ -73,6 +110,9 @@ case "$1" in
     all)
 	all
 	;;
+    singletest)
+	singletest $2
+	;;
     *)
 	cat <<EOF
 Usage: tester.sh [command]
@@ -80,6 +120,9 @@ Usage: tester.sh [command]
 tester.sh all
    Run through the tests, setting up and cleaning up all prerequisites.
 
+tester.sh singletest <name>
+   Run a single test
+
 tester.sh start-cluster
    Start a hedwig cluster
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am Wed Mar 14 11:29:03 2012
@@ -17,7 +17,7 @@
 #
 
 bin_PROGRAMS = hedwigtest
-hedwigtest_SOURCES = main.cpp utiltest.cpp pubsubdatatest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp
+hedwigtest_SOURCES = main.cpp utiltest.cpp pubsubdatatest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp messageboundtest.cpp
 hedwigtest_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS) $(TESTDEPS_CFLAGS) $(BOOST_CPPFLAGS) 
 hedwigtest_LDADD = $(DEPS_LIBS) $(TESTDEPS_LIBS) -L$(top_builddir)/lib -lhedwig01 
 hedwigtest_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp Wed Mar 14 11:29:03 2012
@@ -68,6 +68,7 @@ int main( int argc, char **argv)
     registry.addRegistry("Subscribe");
     registry.addRegistry("Publish"); 
     registry.addRegistry("PubSub");
+    registry.addRegistry("MessageBound");
     
     runner.addTest( registry.makeTest() );
   }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Wed Mar 14 11:29:03 2012
@@ -27,6 +27,7 @@ import org.apache.hedwig.exceptions.PubS
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
 import org.apache.hedwig.util.Callback;
 
 /**
@@ -82,6 +83,52 @@ public interface Subscriber {
     public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
                                Object context);
 
+
+    /**
+     * Subscribe to the given topic for the inputted subscriberId.
+     *
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param options
+     *            Options to pass to the subscription. Can contain attach mode.
+     * @throws CouldNotConnectException
+     *             If we are not able to connect to the server host
+     * @throws ClientAlreadySubscribedException
+     *             If client is already subscribed to the topic
+     * @throws ServiceDownException
+     *             If unable to subscribe to topic
+     * @throws InvalidSubscriberIdException
+     *             If the subscriberId is not valid. We may want to set aside
+     *             certain formats of subscriberId's for different purposes.
+     *             e.g. local vs. hub subscriber
+     */
+    public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
+            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+        InvalidSubscriberIdException;
+
+    /**
+     * Subscribe to the given topic asynchronously for the inputted subscriberId
+     * disregarding if the topic has been created yet or not.
+     *
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param options
+     *            Options to pass to the subscription. Can contain attach mode.
+     * @param callback
+     *            Callback to invoke when the subscribe request to the server
+     *            has actually gone through. This will have to deal with error
+     *            conditions on the async subscribe request.
+     * @param context
+     *            Calling context that the Callback needs since this is done
+     *            asynchronously.
+     */
+    public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options,
+                               Callback<Void> callback, Object context);
+
     /**
      * Unsubscribe from a topic that the subscriberId user has previously
      * subscribed to.

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java Wed Mar 14 11:29:03 2012
@@ -41,6 +41,7 @@ public class ClientConfiguration extends
     protected static final String SERVER_ACK_RESPONSE_TIMEOUT = "server_ack_response_timeout";
     protected static final String TIMEOUT_THREAD_RUN_INTERVAL = "timeout_thread_run_interval";
     protected static final String SSL_ENABLED = "ssl_enabled";
+    protected static final String SUBSCRIPTION_MESSAGE_BOUND = "subscription_message_bound";
 
     // Singletons we want to instantiate only once per ClientConfiguration
     protected HedwigSocketAddress myDefaultServerAddress = null;
@@ -138,6 +139,18 @@ public class ClientConfiguration extends
         return conf.getBoolean(SSL_ENABLED, false);
     }
 
+    /**
+     * The maximum number of messages the hub will queue for subscriptions
+     * created using this configuration. The hub will always queue the most
+     * recent messages. If there are enough publishes to the topic to hit
+     * the bound, then the oldest messages are dropped from the queue.
+     *
+     * A bound of 0 disabled the bound completely. This is the default.
+     */
+    public int getSubscriptionMessageBound() {
+        return conf.getInt(SUBSCRIPTION_MESSAGE_BOUND, 0);
+    }
+
     // Validate that the configuration properties are valid.
     public void validate() throws ConfigurationException {
         if (isSSLEnabled() && getDefaultServerHedwigSocketAddress().getSSLSocketAddress() == null) {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java Wed Mar 14 11:29:03 2012
@@ -22,7 +22,7 @@ import java.util.List;
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
 import org.apache.hedwig.util.Callback;
 
 /**
@@ -45,10 +45,8 @@ public class PubSubData {
     // Enum to indicate what type of operation this PubSub request data object
     // is for.
     public final OperationType operationType;
-    // Enum for subscribe requests to indicate if this is a CREATE, ATTACH, or
-    // CREATE_OR_ATTACH subscription request. For non-subscribe requests,
-    // this will be null.
-    public final CreateOrAttach createOrAttach;
+    // Options for the subscription
+    public final SubscriptionOptions options;
     // These two variables are not final since we might override them
     // in the case of a Subscribe reconnect.
     public Callback<Void> callback;
@@ -84,13 +82,13 @@ public class PubSubData {
 
     // Constructor for all types of PubSub request data to send to the server
     public PubSubData(final ByteString topic, final Message msg, final ByteString subscriberId,
-                      final OperationType operationType, final CreateOrAttach createOrAttach, final Callback<Void> callback,
-                      final Object context) {
+                      final OperationType operationType, final SubscriptionOptions options,
+                      final Callback<Void> callback, final Object context) {
         this.topic = topic;
         this.msg = msg;
         this.subscriberId = subscriberId;
         this.operationType = operationType;
-        this.createOrAttach = createOrAttach;
+        this.options = options;
         this.callback = callback;
         this.context = context;
     }
@@ -117,8 +115,9 @@ public class PubSubData {
             sb.append(COMMA).append("SubscriberId: " + subscriberId.toStringUtf8());
         if (operationType != null)
             sb.append(COMMA).append("Operation Type: " + operationType.toString());
-        if (createOrAttach != null)
-            sb.append(COMMA).append("Create Or Attach: " + createOrAttach.toString());
+        if (options != null)
+            sb.append(COMMA).append("Create Or Attach: " + options.getCreateOrAttach().toString())
+                .append(COMMA).append("Message Bound: " + options.getMessageBound());
         if (triedServers != null && triedServers.size() > 0) {
             sb.append(COMMA).append("Tried Servers: ");
             for (ByteString triedServer : triedServers) {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Wed Mar 14 11:29:03 2012
@@ -48,6 +48,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.util.Callback;
@@ -80,16 +81,17 @@ public class HedwigSubscriber implements
     // two flows are very similar. The assumption is that the input
     // OperationType is either SUBSCRIBE or UNSUBSCRIBE.
     private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
-                          CreateOrAttach createOrAttach) throws CouldNotConnectException, ClientAlreadySubscribedException,
+                          SubscriptionOptions options)
+            throws CouldNotConnectException, ClientAlreadySubscribedException,
         ClientNotSubscribedException, ServiceDownException {
         if (logger.isDebugEnabled())
             logger.debug("Calling a sync subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
                          + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
-                         + createOrAttach);
-        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, null, null);
+                         + options.getCreateOrAttach() + ", messageBound: " + options.getMessageBound());
+        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, null, null);
         synchronized (pubSubData) {
             PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
-            asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, createOrAttach);
+            asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, options);
             try {
                 while (!pubSubData.isDone)
                     pubSubData.wait();
@@ -133,14 +135,14 @@ public class HedwigSubscriber implements
     // flows are very similar. The assumption is that the input OperationType is
     // either SUBSCRIBE or UNSUBSCRIBE.
     private void asyncSubUnsub(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context,
-                               OperationType operationType, CreateOrAttach createOrAttach) {
+                               OperationType operationType, SubscriptionOptions options) {
         if (logger.isDebugEnabled())
             logger.debug("Calling an async subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
                          + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
-                         + createOrAttach);
+                         + options.getCreateOrAttach() + ", messageBound: " + options.getMessageBound());
         // Check if we know which server host is the master for the topic we are
         // subscribing to.
-        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, callback,
+        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, callback,
                                                context);
         if (client.topic2Host.containsKey(topic)) {
             InetSocketAddress host = client.topic2Host.get(topic);
@@ -175,10 +177,17 @@ public class HedwigSubscriber implements
     public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
             throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
         InvalidSubscriberIdException {
-        subscribe(topic, subscriberId, mode, false);
+        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
+        subscribe(topic, subscriberId, options, false);
     }
 
-    protected void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, boolean isHub)
+    public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
+            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+         InvalidSubscriberIdException {
+        subscribe(topic, subscriberId, options, false);
+    }
+
+    protected void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options, boolean isHub)
             throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
         InvalidSubscriberIdException {
         // Validate that the format of the subscriberId is valid either as a
@@ -188,7 +197,7 @@ public class HedwigSubscriber implements
                                                    + ", isHub: " + isHub);
         }
         try {
-            subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, mode);
+            subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, options);
         } catch (ClientNotSubscribedException e) {
             logger.error("Unexpected Exception thrown: " + e.toString());
             // This exception should never be thrown here. But just in case,
@@ -200,10 +209,17 @@ public class HedwigSubscriber implements
 
     public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
                                Object context) {
-        asyncSubscribe(topic, subscriberId, mode, callback, context, false);
+        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
+        asyncSubscribe(topic, subscriberId, options, callback, context, false);
     }
 
-    protected void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode,
+    public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options,
+                               Callback<Void> callback, Object context) {
+        asyncSubscribe(topic, subscriberId, options, callback, context, false);
+    }
+
+    protected void asyncSubscribe(ByteString topic, ByteString subscriberId,
+                                  SubscriptionOptions options,
                                   Callback<Void> callback, Object context, boolean isHub) {
         // Validate that the format of the subscriberId is valid either as a
         // local or hub subscriber.
@@ -212,7 +228,7 @@ public class HedwigSubscriber implements
                                          "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
             return;
         }
-        asyncSubUnsub(topic, subscriberId, callback, context, OperationType.SUBSCRIBE, mode);
+        asyncSubUnsub(topic, subscriberId, callback, context, OperationType.SUBSCRIBE, options);
     }
 
     public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
@@ -333,11 +349,17 @@ public class HedwigSubscriber implements
             // Create the SubscribeRequest
             SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
             subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
-            subscribeRequestBuilder.setCreateOrAttach(pubSubData.createOrAttach);
+            subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
             // For now, all subscribes should wait for all cross-regional
             // subscriptions to be established before returning.
             subscribeRequestBuilder.setSynchronous(true);
 
+            if (pubSubData.options.getMessageBound() > 0) {
+                subscribeRequestBuilder.setMessageBound(pubSubData.options.getMessageBound());
+            } else if (cfg.getSubscriptionMessageBound() > 0) {
+                subscribeRequestBuilder.setMessageBound(cfg.getSubscriptionMessageBound());
+            }
+
             // Set the SubscribeRequest into the outer PubSubRequest
             pubsubRequestBuilder.setSubscribeRequest(subscribeRequestBuilder);
         } else {