You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/03 17:27:12 UTC

svn commit: r1380273 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/test/

Author: ivank
Date: Mon Sep  3 15:27:11 2012
New Revision: 1380273

URL: http://svn.apache.org/viewvc?rev=1380273&view=rev
Log:
BOOKKEEPER-335: client-side message filter for cpp client. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h
      - copied, changed from r1380268, zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Sep  3 15:27:11 2012
@@ -136,6 +136,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-334: client-side message filter for java client. (sijie via ivank)
 
+        BOOKKEEPER-335: client-side message filter for cpp client. (sijie via ivank)
+
 Release 4.1.0 - 2012-06-07
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h Mon Sep  3 15:27:11 2012
@@ -55,6 +55,18 @@ namespace Hedwig {
     virtual ~MessageHandlerCallback() {};
   };
   typedef std::tr1::shared_ptr<MessageHandlerCallback> MessageHandlerCallbackPtr;
+
+  typedef std::tr1::shared_ptr<SubscriptionPreferences> SubscriptionPreferencesPtr;
+
+  class ClientMessageFilter {
+  public:
+    virtual void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+                                            const SubscriptionPreferencesPtr& preferences) = 0;
+    virtual bool testMessage(const Message& message) = 0;
+
+    virtual ~ClientMessageFilter() {};
+  };
+  typedef std::tr1::shared_ptr<ClientMessageFilter> ClientMessageFilterPtr;
 }
 
 #endif

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h Mon Sep  3 15:27:11 2012
@@ -39,6 +39,8 @@ namespace Hedwig {
   class SubscriberException : public ClientException { };
   class AlreadySubscribedException : public SubscriberException {};
   class NotSubscribedException : public SubscriberException {};
+  class NullMessageHandlerException : public SubscriberException {};
+  class NullMessageFilterException : public SubscriberException {};
 
   class ConfigurationException : public ClientException { };
   class InvalidPortException : public ConfigurationException {};

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h Mon Sep  3 15:27:11 2012
@@ -42,7 +42,13 @@ namespace Hedwig {
 
     virtual void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) = 0;
 
-    virtual void startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback) = 0;
+    virtual void startDelivery(const std::string& topic, const std::string& subscriberId,
+                               const MessageHandlerCallbackPtr& callback) = 0;
+    virtual void startDeliveryWithFilter(const std::string& topic,
+                                         const std::string& subscriberId,
+                                         const MessageHandlerCallbackPtr& callback,
+                                         const ClientMessageFilterPtr& filter) = 0;
+
     virtual void stopDelivery(const std::string& topic, const std::string& subscriberId) = 0;
 
     virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am Mon Sep  3 15:27:11 2012
@@ -19,7 +19,7 @@
 PROTODEF = ../../../../../hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
 
 lib_LTLIBRARIES = libhedwig01.la
-libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp
+libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp
 libhedwig01_la_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS)
 libhedwig01_la_LIBADD = $(DEPS_LIBS) $(BOOST_CPPFLAGS) 
 libhedwig01_la_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp?rev=1380273&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp Mon Sep  3 15:27:11 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "filterablemessagehandler.h"
+
+using namespace Hedwig;
+
+FilterableMessageHandler::FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler,
+                                                   const ClientMessageFilterPtr& msgFilter)
+  : msgHandler(msgHandler), msgFilter(msgFilter) {
+}
+
+FilterableMessageHandler::~FilterableMessageHandler() {
+}
+
+void FilterableMessageHandler::consume(const std::string& topic, const std::string& subscriberId,
+                                       const Message& msg, OperationCallbackPtr& callback) {
+  bool deliver = true;
+  if (0 != msgFilter.get()) {
+    deliver = msgFilter->testMessage(msg);
+  }
+  if (deliver) {
+    msgHandler->consume(topic, subscriberId, msg, callback);
+  } else {
+    callback->operationComplete();
+  }
+}

Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h (from r1380268, zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h&r1=1380268&r2=1380273&rev=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h Mon Sep  3 15:27:11 2012
@@ -15,11 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef HEDWIG_CALLBACK_H
-#define HEDWIG_CALLBACK_H
+#ifndef FILTERABLE_MESSAGE_HANDLER_H
+#define FILTERABLE_MESSAGE_HANDLER_H
 
-#include <string>
-#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
 #include <hedwig/protocol.h>
 
 #ifdef USE_BOOST_TR1
@@ -30,31 +29,21 @@
 
 namespace Hedwig {
 
-  template<class R>
-  class Callback {
+  class FilterableMessageHandler : public MessageHandlerCallback {
   public:
-    virtual void operationComplete(const R& result) = 0;
-    virtual void operationFailed(const std::exception& exception) = 0;
+    FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler,
+                             const ClientMessageFilterPtr& msgFilter);
 
-    virtual ~Callback() {};
-  };
+    virtual void consume(const std::string& topic, const std::string& subscriberId,
+                         const Message& msg, OperationCallbackPtr& callback);
 
-  class OperationCallback {
-  public:
-    virtual void operationComplete() = 0;
-    virtual void operationFailed(const std::exception& exception) = 0;
-    
-    virtual ~OperationCallback() {};
+    virtual ~FilterableMessageHandler();
+  private:
+    const MessageHandlerCallbackPtr msgHandler;
+    const ClientMessageFilterPtr msgFilter;
   };
-  typedef std::tr1::shared_ptr<OperationCallback> OperationCallbackPtr;
 
-  class MessageHandlerCallback {
-  public:
-    virtual void consume(const std::string& topic, const std::string& subscriberId, const Message& msg, OperationCallbackPtr& callback) = 0;
-    
-    virtual ~MessageHandlerCallback() {};
-  };
-  typedef std::tr1::shared_ptr<MessageHandlerCallback> MessageHandlerCallbackPtr;
-}
+};
 
 #endif
+

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Mon Sep  3 15:27:11 2012
@@ -22,6 +22,7 @@
 #include "subscriberimpl.h"
 #include "util.h"
 #include "channel.h"
+#include "filterablemessagehandler.h"
 
 #include <boost/asio.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
@@ -405,7 +406,28 @@ void SubscriberImpl::consume(const std::
   channel->writeRequest(data->getRequest(), writecb);
 }
 
-void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback) {
+void SubscriberImpl::startDeliveryWithFilter(const std::string& topic,
+                                             const std::string& subscriberId,
+                                             const MessageHandlerCallbackPtr& callback,
+                                             const ClientMessageFilterPtr& filter) {
+  if (0 == filter.get()) {
+    throw NullMessageFilterException();
+  }
+  if (0 == callback.get()) {
+    throw NullMessageHandlerException();
+  }
+  const SubscriptionPreferencesPtr& preferences =
+    getSubscriptionPreferences(topic, subscriberId);
+  if (0 == preferences.get()) {
+    throw NotSubscribedException();
+  }
+  filter->setSubscriptionPreferences(topic, subscriberId, preferences);
+  MessageHandlerCallbackPtr filterableHandler(new FilterableMessageHandler(callback, filter));
+  startDelivery(topic, subscriberId, filterableHandler);
+}
+
+void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId,
+                                   const MessageHandlerCallbackPtr& callback) {
   TopicSubscriber t(topic, subscriberId);
 
   boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
@@ -446,6 +468,22 @@ void SubscriberImpl::closeSubscription(c
   }
 }
 
+void SubscriberImpl::setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+                                                const SubscriptionPreferences& preferences) {
+  boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2preferences_lock);
+  TopicSubscriber t(topic, subscriberId);
+  SubscriptionPreferencesPtr newPreferences(new SubscriptionPreferences(preferences));
+  topicsubscriber2preferences[t] = newPreferences;
+}
+
+const SubscriptionPreferencesPtr& SubscriberImpl::getSubscriptionPreferences(
+    const std::string& topic, const std::string& subscriberId) {
+  boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2preferences_lock);
+  TopicSubscriber t(topic, subscriberId);
+  const SubscriptionPreferencesPtr &preferences = topicsubscriber2preferences[t];
+  return preferences;
+}
+
 /**
    takes ownership of txn
 */
@@ -459,6 +497,19 @@ void SubscriberImpl::messageHandler(cons
 
   switch (m->statuscode()) {
   case SUCCESS:
+    // for subscribe request, check whether is any subscription preferences received
+    if (SUBSCRIBE == txn->getType()) {
+      if (m->has_responsebody()) {
+        const ResponseBody& respBody = m->responsebody();
+        if (respBody.has_subscriberesponse()) {
+          const SubscribeResponse& resp = respBody.subscriberesponse();
+          if (resp.has_preferences()) {
+            setSubscriptionPreferences(txn->getTopic(), txn->getSubscriberId(),
+                                       resp.preferences());
+          }
+        }
+      }
+    }
     if (m->has_responsebody()) {
       txn->getCallback()->operationComplete(m->responsebody());
     } else {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Mon Sep  3 15:27:11 2012
@@ -153,7 +153,11 @@ namespace Hedwig {
 
     void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId);
 
-    void startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback);
+    void startDelivery(const std::string& topic, const std::string& subscriberId,
+                       const MessageHandlerCallbackPtr& callback);
+    void startDeliveryWithFilter(const std::string& topic, const std::string& subscriberId,
+                                 const MessageHandlerCallbackPtr& callback,
+                                 const ClientMessageFilterPtr& filter);
     void stopDelivery(const std::string& topic, const std::string& subscriberId);
 
     void closeSubscription(const std::string& topic, const std::string& subscriberId);
@@ -164,10 +168,18 @@ namespace Hedwig {
     void doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
 
   private:
+    void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+                                    const SubscriptionPreferences& preferences);
+    const SubscriptionPreferencesPtr& getSubscriptionPreferences(
+                                    const std::string& topic, const std::string& subscriberId);
+
+  private:
     const ClientImplPtr client;
-    
+
     std::tr1::unordered_map<TopicSubscriber, SubscriberClientChannelHandlerPtr, TopicSubscriberHash > topicsubscriber2handler;
     boost::shared_mutex topicsubscriber2handler_lock;	    
+    std::tr1::unordered_map<TopicSubscriber, SubscriptionPreferencesPtr, TopicSubscriberHash> topicsubscriber2preferences;
+    boost::shared_mutex topicsubscriber2preferences_lock;	    
   };
 
 };

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am?rev=1380273&r1=1380272&r2=1380273&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am Mon Sep  3 15:27:11 2012
@@ -18,7 +18,7 @@
 
 if HAVE_GTEST
 bin_PROGRAMS = hedwigtest
-hedwigtest_SOURCES = main.cpp utiltest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp messageboundtest.cpp
+hedwigtest_SOURCES = main.cpp utiltest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp messageboundtest.cpp messagefiltertest.cpp
 hedwigtest_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS) $(GTEST_CPPFLAGS) $(BOOST_CPPFLAGS)
 hedwigtest_CXXFLAGS = $(GTEST_CXXFLAGS)
 hedwigtest_LDADD = $(DEPS_LIBS) $(GTEST_LIBS) -L$(top_builddir)/lib -lhedwig01

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp?rev=1380273&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp Mon Sep  3 15:27:11 2012
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "gtest/gtest.h"
+
+#include "../lib/clientimpl.h"
+#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
+#include <stdexcept>
+#include <pthread.h>
+
+#include <log4cxx/logger.h>
+
+#include "util.h"
+
+static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
+
+class MessageFilterConfiguration : public Hedwig::Configuration {
+public:
+  MessageFilterConfiguration() : address("localhost:4081") {}
+  
+  virtual int getInt(const std::string& key, int defaultVal) const {
+    return defaultVal;
+  }
+
+  virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
+    if (key == Configuration::DEFAULT_SERVER) {
+      return address;
+    } else {
+      return defaultVal;
+    }
+  }
+  
+  virtual bool getBool(const std::string& key, bool defaultVal) const {
+    if (key == Configuration::SUBSCRIBER_AUTOCONSUME) {
+      return false;
+    } else {
+      return defaultVal;
+    }
+  }
+
+  protected:
+  const std::string address;
+};
+    
+class ModMessageFilter : public Hedwig::ClientMessageFilter {
+public:
+  ModMessageFilter() : mod(0) {
+  }
+
+  virtual void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+                                          const Hedwig::SubscriptionPreferencesPtr& preferences) {
+    if (!preferences->has_options()) {
+      return;
+    }
+
+    const Hedwig::Map& userOptions = preferences->options();
+    int numOpts = userOptions.entries_size();
+    for (int i=0; i<numOpts; i++) {
+      const Hedwig::Map_Entry& opt = userOptions.entries(i);
+      const std::string& key = opt.key();
+      if ("MOD" != key) {
+        continue;
+      }
+      const std::string& value = opt.value();
+      mod = atoi(value.c_str());
+      break;
+    }
+    return; 
+  }
+  virtual bool testMessage(const Hedwig::Message& message) {
+    int value = atoi(message.body().c_str());
+    return 0 == value % mod;
+  }
+private:
+  int mod;
+};
+
+class GapCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
+public:
+  GapCheckingMessageHandlerCallback(Hedwig::Subscriber& sub,
+                                    const int start, const int nextValue,
+                                    const int gap, bool doConsume)
+    : sub(sub), start(start), nextValue(nextValue), gap(gap), doConsume(doConsume) {
+  }
+
+  virtual void consume(const std::string& topic, const std::string& subscriberId,
+                       const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
+    boost::lock_guard<boost::mutex> lock(mutex);
+    
+    int value = atoi(msg.body().c_str());
+    if(value > start) {
+      LOG4CXX_DEBUG(logger, "received message " << value);
+      if (value == nextValue) {
+        nextValue += gap;
+      }
+    }
+    callback->operationComplete();
+    if (doConsume) {
+      sub.consume(topic, subscriberId, msg.msgid());
+    }
+  }
+
+  int nextExpected() {
+    return nextValue;
+  }
+
+protected:
+  boost::mutex mutex;
+  Hedwig::Subscriber& sub;
+  int start;
+  int nextValue;
+  int gap;
+  bool doConsume;
+};
+
+void publishNums(Hedwig::Publisher& pub, const std::string& topic,
+                 int start, int num, int M) {
+  for (int i=1; i<=num; i++) {
+    int value = start + i;
+    int mod = value % M;
+
+    std::stringstream valSS;
+    valSS << value;
+
+    std::stringstream modSS;
+    modSS << mod;
+
+    Hedwig::Message msg;
+    msg.set_body(valSS.str());
+    Hedwig::MessageHeader* header = msg.mutable_header();
+    Hedwig::Map* properties = header->mutable_properties();
+    Hedwig::Map_Entry* entry = properties->add_entries();
+    entry->set_key("mod");
+    entry->set_value(modSS.str());
+
+    pub.publish(topic, msg);
+  }
+}
+
+void receiveNumModM(Hedwig::Subscriber& sub,
+                    const std::string& topic, const std::string& subid,
+                    int start, int num, int M, bool consume) {
+  Hedwig::SubscriptionOptions options;
+  options.set_createorattach(Hedwig::SubscribeRequest::ATTACH);
+  Hedwig::Map* userOptions = options.mutable_options();
+  Hedwig::Map_Entry* opt = userOptions->add_entries();
+  opt->set_key("MOD");
+
+  std::stringstream modSS;
+  modSS << M;
+  opt->set_value(modSS.str());
+
+  sub.subscribe(topic, subid, options);
+
+  int base = start + M - start % M;
+  int end = base + num * M;
+
+  GapCheckingMessageHandlerCallback * cb =
+    new GapCheckingMessageHandlerCallback(sub, start, base, M, consume);
+  Hedwig::MessageHandlerCallbackPtr handler(cb);
+  Hedwig::ClientMessageFilterPtr filter(new ModMessageFilter());
+
+  sub.startDeliveryWithFilter(topic, subid, handler, filter);
+
+  for (int i = 0; i < 100; i++) {
+    if (cb->nextExpected() == end) {
+      break;
+    } else {
+      sleep(1);
+    }
+  }
+  ASSERT_TRUE(cb->nextExpected() == end);
+
+  sub.stopDelivery(topic, subid);
+  sub.closeSubscription(topic, subid);
+}
+
+TEST(MessageFilterTest, testNullMessageFilter) {
+  Hedwig::Configuration* conf = new MessageFilterConfiguration();
+  std::auto_ptr<Hedwig::Configuration> confptr(conf);
+  
+  Hedwig::Client* client = new Hedwig::Client(*conf);
+  std::auto_ptr<Hedwig::Client> clientptr(client);
+
+  Hedwig::Subscriber& sub = client->getSubscriber();
+
+  std::string topic = "testNullMessageFilter";
+  std::string subid = "myTestSubid";
+
+  sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+
+  GapCheckingMessageHandlerCallback * cb =
+    new GapCheckingMessageHandlerCallback(sub, 0, 0, 0, true);
+  Hedwig::MessageHandlerCallbackPtr handler(cb);
+  Hedwig::ClientMessageFilterPtr filter(new ModMessageFilter());
+
+  ASSERT_THROW(sub.startDeliveryWithFilter(topic, subid, handler,
+                                           Hedwig::ClientMessageFilterPtr()),
+               Hedwig::NullMessageFilterException);
+  ASSERT_THROW(sub.startDeliveryWithFilter(topic, subid, 
+                                           Hedwig::MessageHandlerCallbackPtr(), filter),
+               Hedwig::NullMessageHandlerException);
+}
+
+TEST(MessageFilterTest, testMessageFilter) {
+  Hedwig::Configuration* conf = new MessageFilterConfiguration();
+  std::auto_ptr<Hedwig::Configuration> confptr(conf);
+  
+  Hedwig::Client* client = new Hedwig::Client(*conf);
+  std::auto_ptr<Hedwig::Client> clientptr(client);
+
+  Hedwig::Subscriber& sub = client->getSubscriber();
+  Hedwig::Publisher& pub = client->getPublisher();
+
+  std::string topic = "testMessageFilter";
+  std::string subid = "myTestSubid";
+  sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+  sub.closeSubscription(topic, subid);
+
+  publishNums(pub, topic, 0, 100, 2);
+  receiveNumModM(sub, topic, subid, 0, 50, 2, true);
+}
+
+TEST(MessageFilterTest, testUpdateMessageFilter) {
+  Hedwig::Configuration* conf = new MessageFilterConfiguration();
+  std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+  Hedwig::Client* client = new Hedwig::Client(*conf);
+  std::auto_ptr<Hedwig::Client> clientptr(client);
+
+  Hedwig::Subscriber& sub = client->getSubscriber();
+  Hedwig::Publisher& pub = client->getPublisher();
+  
+  std::string topic = "testUpdateMessageFilter";
+  std::string subid = "myTestSubid";
+
+  sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+  sub.closeSubscription(topic, subid);
+
+  publishNums(pub, topic, 0, 100, 2);
+  receiveNumModM(sub, topic, subid, 0, 50, 2, false);
+  receiveNumModM(sub, topic, subid, 0, 25, 4, false);
+  receiveNumModM(sub, topic, subid, 0, 33, 3, false);
+}