You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Sijie Guo (JIRA)" <ji...@apache.org> on 2012/07/09 16:48:34 UTC

[jira] [Commented] (BOOKKEEPER-321) Message Filter Support

    [ https://issues.apache.org/jira/browse/BOOKKEEPER-321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13409534#comment-13409534 ] 

Sijie Guo commented on BOOKKEEPER-321:
--------------------------------------

Message Filter Support in Hedwig:

1) providing a message header *MessageHeader* in *Message* : (BOOKKEEPER-78}

system properties are defined as fields in protobuf definition.
user properties are defined as a key/bytes map structure. applications could put their customized data in it.

{code}
+// common structure to store header or properties
+message Map {
+    message Entry {
+        optional string key  = 1;
+        optional bytes value = 2;
+    }
+    repeated Entry entries = 1;
+}
+
+// message header
+message MessageHeader {
+    // user customized fields used for message filter
+    optional Map properties = 1;
+    // following are system properties in message header
+    optional string messageType = 2;
+}
@@ -30,6 +47,8 @@ message Message {
     required bytes body = 1;
     optional bytes srcRegion = 2;
     optional MessageSeqId msgId = 3;
+    // message header
+    optional MessageHeader header = 4;
 }
{code}

2) add preferences for a subscription. (BOOKKEEPER-332)

*SubscriptionPreferences* is a structure to store all preferences for a subscription. The preferences could be passed as *SubscriptionOptions*, sent to hub server and stored in meta store.

system options are defined as fields in *SubscriptionPreferences* in protobuf definition. currently we had two system options, *messageBound* is used to limit number of messages to receive for a subscription,  *messageFilter* is used to support running server-side message filter.

user options are defined as key/bytes map structure. application could put their customized options in it.

{code}
+// record all preferences for a subscription,
+// would be serialized to be stored in meta store
+message SubscriptionPreferences {
+    // user customized subscription options
+    optional Map options = 1;
+
+    ///
+    /// system defined options
+    ///
+
+    // message bound
+    optional uint32 messageBound = 2;
+    // server-side message filter
+    optional string messageFilter = 3;
+}
+

 message SubscribeRequest{
     required bytes subscriberId = 2;

@@ -100,12 +135,22 @@ message SubscribeRequest{

        // wait for cross-regional subscriptions to be established before returning
        optional bool synchronous = 4 [default = false];
+        // @Deprecated. set message bound in SubscriptionPreferences
        optional uint32 messageBound = 5;
+
+        // subscription options
+        optional SubscriptionPreferences preferences = 6;
 }

+// used in client only
+// options are stored in SubscriptionPreferences structure
 message SubscriptionOptions {
     optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
     optional uint32 messageBound = 3 [default = 0];
+    // user customized subscription options
+    optional Map options = 4;
+    // server-side message filter
+    optional string messageFilter = 5;
 }

 message SubscriptionState {
     required MessageSeqId msgId = 1;
+    // @Deprecated.
+    // It is a bad idea to put fields that don't change frequently
+    // together with fields that change frequently
+    // so move it to subscription info structure
     optional uint32 messageBound = 2;
 }

+message SubscriptionData {
+    optional SubscriptionState state = 1;
+    optional SubscriptionPreferences preferences = 2;
+}
+
{code}

The subscription preferences would be returned as response body in subscribe request. The subscription preferences would be passed to message filter running in client-side.

{code}
+message SubscribeResponse {
+    optional SubscriptionPreferences preferences = 2;
+}
+
+message ResponseBody {
+    optional SubscribeResponse subscribeResponse = 2;
+}
+
 message PubSubResponse{
     required ProtocolVersion protocolVersion = 1;
     required StatusCode statusCode = 2;
@@ -142,8 +195,10 @@ message PubSubResponse{
     optional Message message = 5;
     optional bytes topic = 6;
     optional bytes subscriberId = 7;
-}

+    // the following fields are sent by other requests
+    optional ResponseBody respBody = 8;
+}
{code}

3) message filter (BOOKKEEPER-333)

MessageFilter is described as below, which extends the original internal interface.

initialize/uninitialize is used to initialize/uninitialize a message filter running on server-side.

setSubscriptionPreferences passed subscription preferences to the message filter when it starts to run on either server side or client side. so message filter could get the subscriber's preferences and might use them to do filtering.

testMessage is used to answer yes/no giving a message. all the messages return true could be delivered to subscriber.

{code}
+public interface MessageFilter {
+
+    /**
+     * Initialize the message filter.
+     *
+     * @param conf
+     *          Configuration Object. An <i>MessageFilter</i> might read settings from it.
+     * @return message filter
+     * @throws IOException when failed to initialize message filter
+     */
+    public MessageFilter initialize(Configuration conf)
+    throws ConfigurationException, IOException;
+
+    /**
+     * Uninitialize the message filter.
+     */
+    public void uninitialize();
+
+    /**
+     * Set subscription preferences.
+     *
+     * <code>preferences</code> of the subscriber will be passed to message filter when
+     * the message filter attaches to its subscription either in server-side or client-side.
+     *
+     * @param topic
+     *          Topic Name.
+     * @param subscriberId
+     *          Subscriber Id.
+     * @param preferences
+     *          Subscription Preferences.
+     * @return message filter
+     */
+    public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+                                                    SubscriptionPreferences preferences);
+
+    /**
+     * Tests whether a particular message passes the filter or not
+     *
+     * @param message
+     * @return
+     */
+    public boolean testMessage(Message message);
+}
{code}

4) server-side filter (BOOKKEEPER-333)

subscriber sets the message filter class name in its subscription preferences when subscribe. hub server would instantiate the message filter for the subscriber when starting serving subscribe requests.

its subscription preferences would be passed to the message filter and all messages tried to be delivered to it would do #testMessage first. if the message returns true, it enters deliver progress; otherwise, hub server would deliver next message.

5) client-side filter (BOOKKEEPER-334 & BOOKKEEPER-335)

A message filter is attached when #startDelivery to receive messages. all messages would be delivered to messageHandler only they passed testing of the message filter. their subscription preferences are passed to the message filter when calling #startDelivery.

java-client:
{code}

     /**
+     * Begin delivery of messages from the server to us for this topic and
+     * subscriberId.
+     *
+     * Only the messages passed <code>messageFilter</code> could be delivered to
+     * <code>messageHandler</code>.
+     *
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param messageHandler
+     *            Message Handler that will consume the subscribed messages
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic
+     * @throws AlreadyStartDeliveryException
+     *             If someone started delivery a message handler before stopping existed one.
+     */
+    public void startDelivery(ByteString topic, ByteString subscriberId,
+                              MessageHandler messageHandler, MessageFilter messageFilter)
+            throws ClientNotSubscribedException, AlreadyStartDeliveryException;
+
{code}

cpp client:
{code}
+
+  typedef std::tr1::shared_ptr<SubscriptionPreferences> SubscriptionPreferencesPtr;
+
+  class MessageFilter {
+  public:
+    virtual void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
+                                            const SubscriptionPreferencesPtr& preferences) = 0;
+    virtual bool testMessage(const Message& message) = 0;
+
+    virtual ~MessageFilter() {};
+  };
+  typedef std::tr1::shared_ptr<MessageFilter> MessageFilterPtr;


+    virtual void startDelivery(const std::string& topic, const std::string& subscriberId,
+                               const MessageHandlerCallbackPtr& callback,
+                               const MessageFilterPtr& filter) = 0;
+
{code}


6) giving customized properties in MessageHeader and SubscriptionPreferences. different application could  leverage them to implement their own filtering logic.
                
> Message Filter Support
> ----------------------
>
>                 Key: BOOKKEEPER-321
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-321
>             Project: Bookkeeper
>          Issue Type: Improvement
>            Reporter: Sijie Guo
>            Assignee: Sijie Guo
>             Fix For: 4.2.0
>
>
> Support message filtering in hedwig.
> 1) add user-customized headers part in Message, which could be used for message filtering.
> 2) add user-customized subscription preferences, which could be used for message filtering.
> 3) support both server-side & client-side message filter.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira