You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/03 17:27:12 UTC
svn commit: r1380273 - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/
hedwig-client/src/main/cpp/test/
Author: ivank
Date: Mon Sep 3 15:27:11 2012
New Revision: 1380273
URL: http://svn.apache.org/viewvc?rev=1380273&view=rev
Log:
BOOKKEEPER-335: client-side message filter for cpp client. (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h
- copied, changed from r1380268, zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Sep 3 15:27:11 2012
@@ -136,6 +136,8 @@ Trunk (unreleased changes)
BOOKKEEPER-334: client-side message filter for java client. (sijie via ivank)
+ BOOKKEEPER-335: client-side message filter for cpp client. (sijie via ivank)
+
Release 4.1.0 - 2012-06-07
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h Mon Sep 3 15:27:11 2012
@@ -55,6 +55,18 @@ namespace Hedwig {
virtual ~MessageHandlerCallback() {};
};
typedef std::tr1::shared_ptr<MessageHandlerCallback> MessageHandlerCallbackPtr;
+
+ typedef std::tr1::shared_ptr<SubscriptionPreferences> SubscriptionPreferencesPtr;
+
+ class ClientMessageFilter {
+ public:
+ virtual void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+ const SubscriptionPreferencesPtr& preferences) = 0;
+ virtual bool testMessage(const Message& message) = 0;
+
+ virtual ~ClientMessageFilter() {};
+ };
+ typedef std::tr1::shared_ptr<ClientMessageFilter> ClientMessageFilterPtr;
}
#endif
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h Mon Sep 3 15:27:11 2012
@@ -39,6 +39,8 @@ namespace Hedwig {
class SubscriberException : public ClientException { };
class AlreadySubscribedException : public SubscriberException {};
class NotSubscribedException : public SubscriberException {};
+ class NullMessageHandlerException : public SubscriberException {};
+ class NullMessageFilterException : public SubscriberException {};
class ConfigurationException : public ClientException { };
class InvalidPortException : public ConfigurationException {};
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h Mon Sep 3 15:27:11 2012
@@ -42,7 +42,13 @@ namespace Hedwig {
virtual void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) = 0;
- virtual void startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback) = 0;
+ virtual void startDelivery(const std::string& topic, const std::string& subscriberId,
+ const MessageHandlerCallbackPtr& callback) = 0;
+ virtual void startDeliveryWithFilter(const std::string& topic,
+ const std::string& subscriberId,
+ const MessageHandlerCallbackPtr& callback,
+ const ClientMessageFilterPtr& filter) = 0;
+
virtual void stopDelivery(const std::string& topic, const std::string& subscriberId) = 0;
virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am Mon Sep 3 15:27:11 2012
@@ -19,7 +19,7 @@
PROTODEF = ../../../../../hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
lib_LTLIBRARIES = libhedwig01.la
-libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp
+libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp
libhedwig01_la_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS)
libhedwig01_la_LIBADD = $(DEPS_LIBS) $(BOOST_CPPFLAGS)
libhedwig01_la_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp?rev=1380273&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp Mon Sep 3 15:27:11 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "filterablemessagehandler.h"
+
+using namespace Hedwig;
+
+FilterableMessageHandler::FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler,
+ const ClientMessageFilterPtr& msgFilter)
+ : msgHandler(msgHandler), msgFilter(msgFilter) {
+}
+
+FilterableMessageHandler::~FilterableMessageHandler() {
+}
+
+void FilterableMessageHandler::consume(const std::string& topic, const std::string& subscriberId,
+ const Message& msg, OperationCallbackPtr& callback) {
+ bool deliver = true;
+ if (0 != msgFilter.get()) {
+ deliver = msgFilter->testMessage(msg);
+ }
+ if (deliver) {
+ msgHandler->consume(topic, subscriberId, msg, callback);
+ } else {
+ callback->operationComplete();
+ }
+}
Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h (from r1380268, zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h&r1=1380268&r2=1380273&rev=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h Mon Sep 3 15:27:11 2012
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef HEDWIG_CALLBACK_H
-#define HEDWIG_CALLBACK_H
+#ifndef FILTERABLE_MESSAGE_HANDLER_H
+#define FILTERABLE_MESSAGE_HANDLER_H
-#include <string>
-#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
#include <hedwig/protocol.h>
#ifdef USE_BOOST_TR1
@@ -30,31 +29,21 @@
namespace Hedwig {
- template<class R>
- class Callback {
+ class FilterableMessageHandler : public MessageHandlerCallback {
public:
- virtual void operationComplete(const R& result) = 0;
- virtual void operationFailed(const std::exception& exception) = 0;
+ FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler,
+ const ClientMessageFilterPtr& msgFilter);
- virtual ~Callback() {};
- };
+ virtual void consume(const std::string& topic, const std::string& subscriberId,
+ const Message& msg, OperationCallbackPtr& callback);
- class OperationCallback {
- public:
- virtual void operationComplete() = 0;
- virtual void operationFailed(const std::exception& exception) = 0;
-
- virtual ~OperationCallback() {};
+ virtual ~FilterableMessageHandler();
+ private:
+ const MessageHandlerCallbackPtr msgHandler;
+ const ClientMessageFilterPtr msgFilter;
};
- typedef std::tr1::shared_ptr<OperationCallback> OperationCallbackPtr;
- class MessageHandlerCallback {
- public:
- virtual void consume(const std::string& topic, const std::string& subscriberId, const Message& msg, OperationCallbackPtr& callback) = 0;
-
- virtual ~MessageHandlerCallback() {};
- };
- typedef std::tr1::shared_ptr<MessageHandlerCallback> MessageHandlerCallbackPtr;
-}
+};
#endif
+
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Mon Sep 3 15:27:11 2012
@@ -22,6 +22,7 @@
#include "subscriberimpl.h"
#include "util.h"
#include "channel.h"
+#include "filterablemessagehandler.h"
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
@@ -405,7 +406,28 @@ void SubscriberImpl::consume(const std::
channel->writeRequest(data->getRequest(), writecb);
}
-void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback) {
+void SubscriberImpl::startDeliveryWithFilter(const std::string& topic,
+ const std::string& subscriberId,
+ const MessageHandlerCallbackPtr& callback,
+ const ClientMessageFilterPtr& filter) {
+ if (0 == filter.get()) {
+ throw NullMessageFilterException();
+ }
+ if (0 == callback.get()) {
+ throw NullMessageHandlerException();
+ }
+ const SubscriptionPreferencesPtr& preferences =
+ getSubscriptionPreferences(topic, subscriberId);
+ if (0 == preferences.get()) {
+ throw NotSubscribedException();
+ }
+ filter->setSubscriptionPreferences(topic, subscriberId, preferences);
+ MessageHandlerCallbackPtr filterableHandler(new FilterableMessageHandler(callback, filter));
+ startDelivery(topic, subscriberId, filterableHandler);
+}
+
+void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId,
+ const MessageHandlerCallbackPtr& callback) {
TopicSubscriber t(topic, subscriberId);
boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
@@ -446,6 +468,22 @@ void SubscriberImpl::closeSubscription(c
}
}
+void SubscriberImpl::setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+ const SubscriptionPreferences& preferences) {
+ boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2preferences_lock);
+ TopicSubscriber t(topic, subscriberId);
+ SubscriptionPreferencesPtr newPreferences(new SubscriptionPreferences(preferences));
+ topicsubscriber2preferences[t] = newPreferences;
+}
+
+const SubscriptionPreferencesPtr& SubscriberImpl::getSubscriptionPreferences(
+ const std::string& topic, const std::string& subscriberId) {
+ boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2preferences_lock);
+ TopicSubscriber t(topic, subscriberId);
+ const SubscriptionPreferencesPtr &preferences = topicsubscriber2preferences[t];
+ return preferences;
+}
+
/**
takes ownership of txn
*/
@@ -459,6 +497,19 @@ void SubscriberImpl::messageHandler(cons
switch (m->statuscode()) {
case SUCCESS:
+ // for subscribe request, check whether is any subscription preferences received
+ if (SUBSCRIBE == txn->getType()) {
+ if (m->has_responsebody()) {
+ const ResponseBody& respBody = m->responsebody();
+ if (respBody.has_subscriberesponse()) {
+ const SubscribeResponse& resp = respBody.subscriberesponse();
+ if (resp.has_preferences()) {
+ setSubscriptionPreferences(txn->getTopic(), txn->getSubscriberId(),
+ resp.preferences());
+ }
+ }
+ }
+ }
if (m->has_responsebody()) {
txn->getCallback()->operationComplete(m->responsebody());
} else {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Mon Sep 3 15:27:11 2012
@@ -153,7 +153,11 @@ namespace Hedwig {
void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId);
- void startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback);
+ void startDelivery(const std::string& topic, const std::string& subscriberId,
+ const MessageHandlerCallbackPtr& callback);
+ void startDeliveryWithFilter(const std::string& topic, const std::string& subscriberId,
+ const MessageHandlerCallbackPtr& callback,
+ const ClientMessageFilterPtr& filter);
void stopDelivery(const std::string& topic, const std::string& subscriberId);
void closeSubscription(const std::string& topic, const std::string& subscriberId);
@@ -164,10 +168,18 @@ namespace Hedwig {
void doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
private:
+ void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+ const SubscriptionPreferences& preferences);
+ const SubscriptionPreferencesPtr& getSubscriptionPreferences(
+ const std::string& topic, const std::string& subscriberId);
+
+ private:
const ClientImplPtr client;
-
+
std::tr1::unordered_map<TopicSubscriber, SubscriberClientChannelHandlerPtr, TopicSubscriberHash > topicsubscriber2handler;
boost::shared_mutex topicsubscriber2handler_lock;
+ std::tr1::unordered_map<TopicSubscriber, SubscriptionPreferencesPtr, TopicSubscriberHash> topicsubscriber2preferences;
+ boost::shared_mutex topicsubscriber2preferences_lock;
};
};
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am Mon Sep 3 15:27:11 2012
@@ -18,7 +18,7 @@
if HAVE_GTEST
bin_PROGRAMS = hedwigtest
-hedwigtest_SOURCES = main.cpp utiltest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp messageboundtest.cpp
+hedwigtest_SOURCES = main.cpp utiltest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp messageboundtest.cpp messagefiltertest.cpp
hedwigtest_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS) $(GTEST_CPPFLAGS) $(BOOST_CPPFLAGS)
hedwigtest_CXXFLAGS = $(GTEST_CXXFLAGS)
hedwigtest_LDADD = $(DEPS_LIBS) $(GTEST_LIBS) -L$(top_builddir)/lib -lhedwig01
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp?rev=1380273&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp Mon Sep 3 15:27:11 2012
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "gtest/gtest.h"
+
+#include "../lib/clientimpl.h"
+#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
+#include <stdexcept>
+#include <pthread.h>
+
+#include <log4cxx/logger.h>
+
+#include "util.h"
+
+static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
+
+class MessageFilterConfiguration : public Hedwig::Configuration {
+public:
+ MessageFilterConfiguration() : address("localhost:4081") {}
+
+ virtual int getInt(const std::string& key, int defaultVal) const {
+ return defaultVal;
+ }
+
+ virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
+ if (key == Configuration::DEFAULT_SERVER) {
+ return address;
+ } else {
+ return defaultVal;
+ }
+ }
+
+ virtual bool getBool(const std::string& key, bool defaultVal) const {
+ if (key == Configuration::SUBSCRIBER_AUTOCONSUME) {
+ return false;
+ } else {
+ return defaultVal;
+ }
+ }
+
+ protected:
+ const std::string address;
+};
+
+class ModMessageFilter : public Hedwig::ClientMessageFilter {
+public:
+ ModMessageFilter() : mod(0) {
+ }
+
+ virtual void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+ const Hedwig::SubscriptionPreferencesPtr& preferences) {
+ if (!preferences->has_options()) {
+ return;
+ }
+
+ const Hedwig::Map& userOptions = preferences->options();
+ int numOpts = userOptions.entries_size();
+ for (int i=0; i<numOpts; i++) {
+ const Hedwig::Map_Entry& opt = userOptions.entries(i);
+ const std::string& key = opt.key();
+ if ("MOD" != key) {
+ continue;
+ }
+ const std::string& value = opt.value();
+ mod = atoi(value.c_str());
+ break;
+ }
+ return;
+ }
+ virtual bool testMessage(const Hedwig::Message& message) {
+ int value = atoi(message.body().c_str());
+ return 0 == value % mod;
+ }
+private:
+ int mod;
+};
+
+class GapCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
+public:
+ GapCheckingMessageHandlerCallback(Hedwig::Subscriber& sub,
+ const int start, const int nextValue,
+ const int gap, bool doConsume)
+ : sub(sub), start(start), nextValue(nextValue), gap(gap), doConsume(doConsume) {
+ }
+
+ virtual void consume(const std::string& topic, const std::string& subscriberId,
+ const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
+ boost::lock_guard<boost::mutex> lock(mutex);
+
+ int value = atoi(msg.body().c_str());
+ if(value > start) {
+ LOG4CXX_DEBUG(logger, "received message " << value);
+ if (value == nextValue) {
+ nextValue += gap;
+ }
+ }
+ callback->operationComplete();
+ if (doConsume) {
+ sub.consume(topic, subscriberId, msg.msgid());
+ }
+ }
+
+ int nextExpected() {
+ return nextValue;
+ }
+
+protected:
+ boost::mutex mutex;
+ Hedwig::Subscriber& sub;
+ int start;
+ int nextValue;
+ int gap;
+ bool doConsume;
+};
+
+void publishNums(Hedwig::Publisher& pub, const std::string& topic,
+ int start, int num, int M) {
+ for (int i=1; i<=num; i++) {
+ int value = start + i;
+ int mod = value % M;
+
+ std::stringstream valSS;
+ valSS << value;
+
+ std::stringstream modSS;
+ modSS << mod;
+
+ Hedwig::Message msg;
+ msg.set_body(valSS.str());
+ Hedwig::MessageHeader* header = msg.mutable_header();
+ Hedwig::Map* properties = header->mutable_properties();
+ Hedwig::Map_Entry* entry = properties->add_entries();
+ entry->set_key("mod");
+ entry->set_value(modSS.str());
+
+ pub.publish(topic, msg);
+ }
+}
+
+void receiveNumModM(Hedwig::Subscriber& sub,
+ const std::string& topic, const std::string& subid,
+ int start, int num, int M, bool consume) {
+ Hedwig::SubscriptionOptions options;
+ options.set_createorattach(Hedwig::SubscribeRequest::ATTACH);
+ Hedwig::Map* userOptions = options.mutable_options();
+ Hedwig::Map_Entry* opt = userOptions->add_entries();
+ opt->set_key("MOD");
+
+ std::stringstream modSS;
+ modSS << M;
+ opt->set_value(modSS.str());
+
+ sub.subscribe(topic, subid, options);
+
+ int base = start + M - start % M;
+ int end = base + num * M;
+
+ GapCheckingMessageHandlerCallback * cb =
+ new GapCheckingMessageHandlerCallback(sub, start, base, M, consume);
+ Hedwig::MessageHandlerCallbackPtr handler(cb);
+ Hedwig::ClientMessageFilterPtr filter(new ModMessageFilter());
+
+ sub.startDeliveryWithFilter(topic, subid, handler, filter);
+
+ for (int i = 0; i < 100; i++) {
+ if (cb->nextExpected() == end) {
+ break;
+ } else {
+ sleep(1);
+ }
+ }
+ ASSERT_TRUE(cb->nextExpected() == end);
+
+ sub.stopDelivery(topic, subid);
+ sub.closeSubscription(topic, subid);
+}
+
+TEST(MessageFilterTest, testNullMessageFilter) {
+ Hedwig::Configuration* conf = new MessageFilterConfiguration();
+ std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+ Hedwig::Client* client = new Hedwig::Client(*conf);
+ std::auto_ptr<Hedwig::Client> clientptr(client);
+
+ Hedwig::Subscriber& sub = client->getSubscriber();
+
+ std::string topic = "testNullMessageFilter";
+ std::string subid = "myTestSubid";
+
+ sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+
+ GapCheckingMessageHandlerCallback * cb =
+ new GapCheckingMessageHandlerCallback(sub, 0, 0, 0, true);
+ Hedwig::MessageHandlerCallbackPtr handler(cb);
+ Hedwig::ClientMessageFilterPtr filter(new ModMessageFilter());
+
+ ASSERT_THROW(sub.startDeliveryWithFilter(topic, subid, handler,
+ Hedwig::ClientMessageFilterPtr()),
+ Hedwig::NullMessageFilterException);
+ ASSERT_THROW(sub.startDeliveryWithFilter(topic, subid,
+ Hedwig::MessageHandlerCallbackPtr(), filter),
+ Hedwig::NullMessageHandlerException);
+}
+
+TEST(MessageFilterTest, testMessageFilter) {
+ Hedwig::Configuration* conf = new MessageFilterConfiguration();
+ std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+ Hedwig::Client* client = new Hedwig::Client(*conf);
+ std::auto_ptr<Hedwig::Client> clientptr(client);
+
+ Hedwig::Subscriber& sub = client->getSubscriber();
+ Hedwig::Publisher& pub = client->getPublisher();
+
+ std::string topic = "testMessageFilter";
+ std::string subid = "myTestSubid";
+ sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+ sub.closeSubscription(topic, subid);
+
+ publishNums(pub, topic, 0, 100, 2);
+ receiveNumModM(sub, topic, subid, 0, 50, 2, true);
+}
+
+TEST(MessageFilterTest, testUpdateMessageFilter) {
+ Hedwig::Configuration* conf = new MessageFilterConfiguration();
+ std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+ Hedwig::Client* client = new Hedwig::Client(*conf);
+ std::auto_ptr<Hedwig::Client> clientptr(client);
+
+ Hedwig::Subscriber& sub = client->getSubscriber();
+ Hedwig::Publisher& pub = client->getPublisher();
+
+ std::string topic = "testUpdateMessageFilter";
+ std::string subid = "myTestSubid";
+
+ sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+ sub.closeSubscription(topic, subid);
+
+ publishNums(pub, topic, 0, 100, 2);
+ receiveNumModM(sub, topic, subid, 0, 50, 2, false);
+ receiveNumModM(sub, topic, subid, 0, 25, 4, false);
+ receiveNumModM(sub, topic, subid, 0, 33, 3, false);
+}