You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/14 12:29:04 UTC
svn commit: r1300510 [1/3] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/inc/hedwig/
hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/
hedwig-client/src/main/cpp/test/ hedwig-client/src/ma...
Author: ivank
Date: Wed Mar 14 11:29:03 2012
New Revision: 1300510
URL: http://svn.apache.org/viewvc?rev=1300510&view=rev
Log:
BOOKKEEPER-168: Message bounding on subscriptions (ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
Removed:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/log4cpp.conf
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Mar 14 11:29:03 2012
@@ -86,6 +86,8 @@ Trunk (unreleased changes)
BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
+ BOOKKEEPER-168: Message bounding on subscriptions (ivank)
+
bookkeeper-benchmark/
BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h Wed Mar 14 11:29:03 2012
@@ -47,6 +47,15 @@ namespace Hedwig {
static const std::string SYNC_REQUEST_TIMEOUT;
static const std::string SUBSCRIBER_AUTOCONSUME;
static const std::string NUM_DISPATCH_THREADS;
+ /**
+ * The maximum number of messages the hub will queue for subscriptions
+ * created using this configuration. The hub will always queue the most
+ * recent messages. If there are enough publishes to the topic to hit
+ * the bound, then the oldest messages are dropped from the queue.
+ *
+ * A bound of 0 disabled the bound completely.
+ */
+ static const std::string SUBSCRIPTION_MESSAGE_BOUND;
public:
Configuration() {};
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h Wed Mar 14 11:29:03 2012
@@ -34,6 +34,8 @@ namespace Hedwig {
public:
virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) = 0;
virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) = 0;
+ virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) = 0;
+ virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) = 0;
virtual void unsubscribe(const std::string& topic, const std::string& subscriberId) = 0;
virtual void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) = 0;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp Wed Mar 14 11:29:03 2012
@@ -38,6 +38,7 @@ const std::string Configuration::RECONNE
const std::string Configuration::SYNC_REQUEST_TIMEOUT = "hedwig.cpp.sync_request_timeout";
const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume";
const std::string Configuration::NUM_DISPATCH_THREADS = "hedwig.cpp.num_dispatch_threads";
+const std::string Configuration::SUBSCRIPTION_MESSAGE_BOUND = "hedwig.cpp.subscription_message_bound";
Client::Client(const Configuration& conf) {
LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")");
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp Wed Mar 14 11:29:03 2012
@@ -38,14 +38,14 @@ PubSubDataPtr PubSubData::forPublishRequ
return ptr;
}
-PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, SubscribeRequest::CreateOrAttach mode) {
+PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, const SubscriptionOptions& options) {
PubSubDataPtr ptr(new PubSubData());
ptr->type = SUBSCRIBE;
ptr->txnid = txnid;
ptr->subscriberid = subscriberid;
ptr->topic = topic;
ptr->callback = callback;
- ptr->mode = mode;
+ ptr->options = options;
return ptr;
}
@@ -69,7 +69,7 @@ PubSubDataPtr PubSubData::forConsumeRequ
return ptr;
}
-PubSubData::PubSubData() : shouldClaim(false) {
+PubSubData::PubSubData() : shouldClaim(false), messageBound(0) {
}
PubSubData::~PubSubData() {
@@ -116,7 +116,10 @@ const PubSubRequestPtr PubSubData::getRe
Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
subreq->set_subscriberid(subscriberid);
- subreq->set_createorattach(mode);
+ subreq->set_createorattach(options.createorattach());
+ if (options.messagebound() > 0) {
+ subreq->set_messagebound(options.messagebound());
+ }
} else if (type == CONSUME) {
LOG4CXX_DEBUG(logger, "Creating consume request");
@@ -164,6 +167,6 @@ const std::string& PubSubData::getSubscr
return subscriberid;
}
-SubscribeRequest::CreateOrAttach PubSubData::getMode() const {
- return mode;
+const SubscriptionOptions& PubSubData::getSubscriptionOptions() const {
+ return options;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h Wed Mar 14 11:29:03 2012
@@ -62,7 +62,8 @@ namespace Hedwig {
public:
// to be used for publish
static PubSubDataPtr forPublishRequest(long txnid, const std::string& topic, const std::string& body, const OperationCallbackPtr& callback);
- static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, SubscribeRequest::CreateOrAttach mode);
+ static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
+ const OperationCallbackPtr& callback, const SubscriptionOptions& options);
static PubSubDataPtr forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback);
static PubSubDataPtr forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid);
@@ -76,11 +77,12 @@ namespace Hedwig {
const MessageSeqId getMessageSeqId() const;
void setShouldClaim(bool shouldClaim);
+ void setMessageBound(int messageBound);
const PubSubRequestPtr getRequest();
void setCallback(const OperationCallbackPtr& callback);
OperationCallbackPtr& getCallback();
- SubscribeRequest::CreateOrAttach getMode() const;
+ const SubscriptionOptions& getSubscriptionOptions() const;
void addTriedServer(HostAddress& h);
bool hasTriedServer(HostAddress& h);
@@ -95,8 +97,9 @@ namespace Hedwig {
std::string topic;
std::string body;
bool shouldClaim;
+ int messageBound;
OperationCallbackPtr callback;
- SubscribeRequest::CreateOrAttach mode;
+ SubscriptionOptions options;
MessageSeqId msgid;
std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers;
};
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Wed Mar 14 11:29:03 2012
@@ -36,6 +36,7 @@ const int DEFAULT_SUBSCRIBER_CONSUME_RET
const int DEFAULT_MAX_MESSAGE_QUEUE_SIZE = 10;
const int DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = 5000;
const bool DEFAULT_SUBSCRIBER_AUTOCONSUME = true;
+const int DEFAULT_SUBSCRIPTION_MESSAGE_BOUND = 0;
SubscriberWriteCallback::SubscriberWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
@@ -298,17 +299,37 @@ SubscriberImpl::~SubscriberImpl()
void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) {
+ SubscriptionOptions options;
+ options.set_createorattach(mode);
+ subscribe(topic, subscriberId, options);
+}
+
+void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) {
SyncOperationCallback* cb = new SyncOperationCallback(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
DEFAULT_SYNC_REQUEST_TIMEOUT));
OperationCallbackPtr callback(cb);
- asyncSubscribe(topic, subscriberId, mode, callback);
+ asyncSubscribe(topic, subscriberId, options, callback);
cb->wait();
cb->throwExceptionIfNeeded();
}
void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) {
- PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId, topic, callback, mode);
+ SubscriptionOptions options;
+ options.set_createorattach(mode);
+ asyncSubscribe(topic, subscriberId, options, callback);
+}
+
+void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) {
+ SubscriptionOptions options2 = options;
+
+ if (!options2.has_messagebound()) {
+ int messageBound = client->getConfiguration().getInt(Configuration::SUBSCRIPTION_MESSAGE_BOUND,
+ DEFAULT_SUBSCRIPTION_MESSAGE_BOUND);
+ options2.set_messagebound(messageBound);
+ }
+
+ PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId, topic, callback, options2);
SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, *this, data));
ChannelHandlerPtr baseptr = handler;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Wed Mar 14 11:29:03 2012
@@ -145,6 +145,8 @@ namespace Hedwig {
void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode);
void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback);
+ void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options);
+ void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback);
void unsubscribe(const std::string& topic, const std::string& subscriberId);
void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf Wed Mar 14 11:29:03 2012
@@ -38,12 +38,12 @@ log4j.appender.hedwigtest.layout=org.apa
log4j.appender.hedwigtest.layout.ConversionPattern=%.5m%n
# category
-log4j.category.hedwig=DEBUG, hedwig
-log4j.rootCategory=DEBUG
+log4j.category.hedwig=INFO, hedwig
+log4j.rootCategory=INFO
#log4j.category.hedwig.channel=ERROR
log4j.category.hedwig.util=ERROR
log4j.category.hedwigtest.servercontrol=ERROR
-log4j.category.hedwigtest=DEBUG, hedwigtest
-log4j.rootCategory=DEBUG
+log4j.category.hedwigtest=INFO, hedwigtest
+log4j.rootCategory=INFO
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh Wed Mar 14 11:29:03 2012
@@ -19,6 +19,8 @@
cd `dirname $0`;
+export LOG4CXX_CONF=`pwd`/log4cxx.conf
+
source network-delays.sh
source server-control.sh
@@ -57,6 +59,41 @@ EOF
exit $RESULT
}
+singletest() {
+ if [ "z$HEDWIG_NETWORK_DELAY" != "z" ]; then
+ setup_delays $HEDWIG_NETWORK_DELAY
+ fi
+
+ stop_cluster;
+ start_cluster;
+
+ ../test/hedwigtest $1
+ RESULT=$?
+ stop_cluster;
+
+ if [ "z$HEDWIG_NETWORK_DELAY" != "z" ]; then
+ clear_delays
+ else
+ cat <<EOF
+
+The environment variable HEDWIG_NETWORK_DELAY is not set, so the tests were run directly
+with a localhost server. This isn't quite realistic as usually there will be some delay between
+the client and the hedwig server. Set HEDWIG_NETWORK_DELAY to the number of milliseconds you want
+to delay the packets between the server and client.
+
+ $ export HEDWIG_NETWORK_DELAY=500
+
+Requires root privileges.
+
+WARNING!!! This will modify your traffic shaping and firewall rules. If you do run with delays,
+check your firewall rules afterwards.
+
+EOF
+ fi
+
+ exit $RESULT
+}
+
case "$1" in
start-cluster)
start_cluster
@@ -73,6 +110,9 @@ case "$1" in
all)
all
;;
+ singletest)
+ singletest $2
+ ;;
*)
cat <<EOF
Usage: tester.sh [command]
@@ -80,6 +120,9 @@ Usage: tester.sh [command]
tester.sh all
Run through the tests, setting up and cleaning up all prerequisites.
+tester.sh singletest <name>
+ Run a single test
+
tester.sh start-cluster
Start a hedwig cluster
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am Wed Mar 14 11:29:03 2012
@@ -17,7 +17,7 @@
#
bin_PROGRAMS = hedwigtest
-hedwigtest_SOURCES = main.cpp utiltest.cpp pubsubdatatest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp
+hedwigtest_SOURCES = main.cpp utiltest.cpp pubsubdatatest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp messageboundtest.cpp
hedwigtest_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS) $(TESTDEPS_CFLAGS) $(BOOST_CPPFLAGS)
hedwigtest_LDADD = $(DEPS_LIBS) $(TESTDEPS_LIBS) -L$(top_builddir)/lib -lhedwig01
hedwigtest_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp Wed Mar 14 11:29:03 2012
@@ -68,6 +68,7 @@ int main( int argc, char **argv)
registry.addRegistry("Subscribe");
registry.addRegistry("Publish");
registry.addRegistry("PubSub");
+ registry.addRegistry("MessageBound");
runner.addTest( registry.makeTest() );
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Wed Mar 14 11:29:03 2012
@@ -27,6 +27,7 @@ import org.apache.hedwig.exceptions.PubS
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.util.Callback;
/**
@@ -82,6 +83,52 @@ public interface Subscriber {
public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
Object context);
+
+ /**
+ * Subscribe to the given topic for the inputted subscriberId.
+ *
+ * @param topic
+ * Topic name of the subscription
+ * @param subscriberId
+ * ID of the subscriber
+ * @param options
+ * Options to pass to the subscription. Can contain attach mode.
+ * @throws CouldNotConnectException
+ * If we are not able to connect to the server host
+ * @throws ClientAlreadySubscribedException
+ * If client is already subscribed to the topic
+ * @throws ServiceDownException
+ * If unable to subscribe to topic
+ * @throws InvalidSubscriberIdException
+ * If the subscriberId is not valid. We may want to set aside
+ * certain formats of subscriberId's for different purposes.
+ * e.g. local vs. hub subscriber
+ */
+ public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
+ throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+ InvalidSubscriberIdException;
+
+ /**
+ * Subscribe to the given topic asynchronously for the inputted subscriberId
+ * disregarding if the topic has been created yet or not.
+ *
+ * @param topic
+ * Topic name of the subscription
+ * @param subscriberId
+ * ID of the subscriber
+ * @param options
+ * Options to pass to the subscription. Can contain attach mode.
+ * @param callback
+ * Callback to invoke when the subscribe request to the server
+ * has actually gone through. This will have to deal with error
+ * conditions on the async subscribe request.
+ * @param context
+ * Calling context that the Callback needs since this is done
+ * asynchronously.
+ */
+ public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options,
+ Callback<Void> callback, Object context);
+
/**
* Unsubscribe from a topic that the subscriberId user has previously
* subscribed to.
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java Wed Mar 14 11:29:03 2012
@@ -41,6 +41,7 @@ public class ClientConfiguration extends
protected static final String SERVER_ACK_RESPONSE_TIMEOUT = "server_ack_response_timeout";
protected static final String TIMEOUT_THREAD_RUN_INTERVAL = "timeout_thread_run_interval";
protected static final String SSL_ENABLED = "ssl_enabled";
+ protected static final String SUBSCRIPTION_MESSAGE_BOUND = "subscription_message_bound";
// Singletons we want to instantiate only once per ClientConfiguration
protected HedwigSocketAddress myDefaultServerAddress = null;
@@ -138,6 +139,18 @@ public class ClientConfiguration extends
return conf.getBoolean(SSL_ENABLED, false);
}
+ /**
+ * The maximum number of messages the hub will queue for subscriptions
+ * created using this configuration. The hub will always queue the most
+ * recent messages. If there are enough publishes to the topic to hit
+ * the bound, then the oldest messages are dropped from the queue.
+ *
+ * A bound of 0 disabled the bound completely. This is the default.
+ */
+ public int getSubscriptionMessageBound() {
+ return conf.getInt(SUBSCRIPTION_MESSAGE_BOUND, 0);
+ }
+
// Validate that the configuration properties are valid.
public void validate() throws ConfigurationException {
if (isSSLEnabled() && getDefaultServerHedwigSocketAddress().getSSLSocketAddress() == null) {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java Wed Mar 14 11:29:03 2012
@@ -22,7 +22,7 @@ import java.util.List;
import com.google.protobuf.ByteString;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.util.Callback;
/**
@@ -45,10 +45,8 @@ public class PubSubData {
// Enum to indicate what type of operation this PubSub request data object
// is for.
public final OperationType operationType;
- // Enum for subscribe requests to indicate if this is a CREATE, ATTACH, or
- // CREATE_OR_ATTACH subscription request. For non-subscribe requests,
- // this will be null.
- public final CreateOrAttach createOrAttach;
+ // Options for the subscription
+ public final SubscriptionOptions options;
// These two variables are not final since we might override them
// in the case of a Subscribe reconnect.
public Callback<Void> callback;
@@ -84,13 +82,13 @@ public class PubSubData {
// Constructor for all types of PubSub request data to send to the server
public PubSubData(final ByteString topic, final Message msg, final ByteString subscriberId,
- final OperationType operationType, final CreateOrAttach createOrAttach, final Callback<Void> callback,
- final Object context) {
+ final OperationType operationType, final SubscriptionOptions options,
+ final Callback<Void> callback, final Object context) {
this.topic = topic;
this.msg = msg;
this.subscriberId = subscriberId;
this.operationType = operationType;
- this.createOrAttach = createOrAttach;
+ this.options = options;
this.callback = callback;
this.context = context;
}
@@ -117,8 +115,9 @@ public class PubSubData {
sb.append(COMMA).append("SubscriberId: " + subscriberId.toStringUtf8());
if (operationType != null)
sb.append(COMMA).append("Operation Type: " + operationType.toString());
- if (createOrAttach != null)
- sb.append(COMMA).append("Create Or Attach: " + createOrAttach.toString());
+ if (options != null)
+ sb.append(COMMA).append("Create Or Attach: " + options.getCreateOrAttach().toString())
+ .append(COMMA).append("Message Bound: " + options.getMessageBound());
if (triedServers != null && triedServers.size() > 0) {
sb.append(COMMA).append("Tried Servers: ");
for (ByteString triedServer : triedServers) {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Wed Mar 14 11:29:03 2012
@@ -48,6 +48,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
@@ -80,16 +81,17 @@ public class HedwigSubscriber implements
// two flows are very similar. The assumption is that the input
// OperationType is either SUBSCRIBE or UNSUBSCRIBE.
private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
- CreateOrAttach createOrAttach) throws CouldNotConnectException, ClientAlreadySubscribedException,
+ SubscriptionOptions options)
+ throws CouldNotConnectException, ClientAlreadySubscribedException,
ClientNotSubscribedException, ServiceDownException {
if (logger.isDebugEnabled())
logger.debug("Calling a sync subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
- + createOrAttach);
- PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, null, null);
+ + options.getCreateOrAttach() + ", messageBound: " + options.getMessageBound());
+ PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, null, null);
synchronized (pubSubData) {
PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
- asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, createOrAttach);
+ asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, options);
try {
while (!pubSubData.isDone)
pubSubData.wait();
@@ -133,14 +135,14 @@ public class HedwigSubscriber implements
// flows are very similar. The assumption is that the input OperationType is
// either SUBSCRIBE or UNSUBSCRIBE.
private void asyncSubUnsub(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context,
- OperationType operationType, CreateOrAttach createOrAttach) {
+ OperationType operationType, SubscriptionOptions options) {
if (logger.isDebugEnabled())
logger.debug("Calling an async subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
+ subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
- + createOrAttach);
+ + options.getCreateOrAttach() + ", messageBound: " + options.getMessageBound());
// Check if we know which server host is the master for the topic we are
// subscribing to.
- PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, callback,
+ PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, callback,
context);
if (client.topic2Host.containsKey(topic)) {
InetSocketAddress host = client.topic2Host.get(topic);
@@ -175,10 +177,17 @@ public class HedwigSubscriber implements
public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
InvalidSubscriberIdException {
- subscribe(topic, subscriberId, mode, false);
+ SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
+ subscribe(topic, subscriberId, options, false);
}
- protected void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, boolean isHub)
+ public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
+ throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+ InvalidSubscriberIdException {
+ subscribe(topic, subscriberId, options, false);
+ }
+
+ protected void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options, boolean isHub)
throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
InvalidSubscriberIdException {
// Validate that the format of the subscriberId is valid either as a
@@ -188,7 +197,7 @@ public class HedwigSubscriber implements
+ ", isHub: " + isHub);
}
try {
- subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, mode);
+ subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, options);
} catch (ClientNotSubscribedException e) {
logger.error("Unexpected Exception thrown: " + e.toString());
// This exception should never be thrown here. But just in case,
@@ -200,10 +209,17 @@ public class HedwigSubscriber implements
public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
Object context) {
- asyncSubscribe(topic, subscriberId, mode, callback, context, false);
+ SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
+ asyncSubscribe(topic, subscriberId, options, callback, context, false);
}
- protected void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode,
+ public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options,
+ Callback<Void> callback, Object context) {
+ asyncSubscribe(topic, subscriberId, options, callback, context, false);
+ }
+
+ protected void asyncSubscribe(ByteString topic, ByteString subscriberId,
+ SubscriptionOptions options,
Callback<Void> callback, Object context, boolean isHub) {
// Validate that the format of the subscriberId is valid either as a
// local or hub subscriber.
@@ -212,7 +228,7 @@ public class HedwigSubscriber implements
"SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
return;
}
- asyncSubUnsub(topic, subscriberId, callback, context, OperationType.SUBSCRIBE, mode);
+ asyncSubUnsub(topic, subscriberId, callback, context, OperationType.SUBSCRIBE, options);
}
public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
@@ -333,11 +349,17 @@ public class HedwigSubscriber implements
// Create the SubscribeRequest
SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
- subscribeRequestBuilder.setCreateOrAttach(pubSubData.createOrAttach);
+ subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
// For now, all subscribes should wait for all cross-regional
// subscriptions to be established before returning.
subscribeRequestBuilder.setSynchronous(true);
+ if (pubSubData.options.getMessageBound() > 0) {
+ subscribeRequestBuilder.setMessageBound(pubSubData.options.getMessageBound());
+ } else if (cfg.getSubscriptionMessageBound() > 0) {
+ subscribeRequestBuilder.setMessageBound(cfg.getSubscriptionMessageBound());
+ }
+
// Set the SubscribeRequest into the outer PubSubRequest
pubsubRequestBuilder.setSubscribeRequest(subscribeRequestBuilder);
} else {