You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/03 17:14:14 UTC

svn commit: r1380268 [1/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/api/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-...

Author: ivank
Date: Mon Sep  3 15:14:13 2012
New Revision: 1380268

URL: http://svn.apache.org/viewvc?rev=1380268&view=rev
Log:
BOOKKEEPER-334: client-side message filter for java client. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java
      - copied, changed from r1379445, zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java
      - copied, changed from r1379445, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java
      - copied, changed from r1379445, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java
Removed:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Sep  3 15:14:13 2012
@@ -134,6 +134,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
 
+        BOOKKEEPER-334: client-side message filter for java client. (sijie via ivank)
+
 Release 4.1.0 - 2012-06-07
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Mon Sep  3 15:14:13 2012
@@ -26,6 +26,7 @@ import org.apache.hedwig.exceptions.PubS
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.filter.ClientMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
@@ -118,7 +119,7 @@ public interface Subscriber {
      * <p>Subscribe to the given topic asynchronously for the inputted subscriberId.</p>
      *
      * <p>SubscriptionOptions contains parameters for how the hub should make the subscription.
-     * The two options are the createorattach mode and message bound.</p>
+     * The options includes createorattach mode, message bound and message filter.</p>
      *
      * <p>The createorattach mode defines whether the subscription should create a new subscription, or
      * just attach to a preexisting subscription. If it tries to create the subscription, and the
@@ -131,6 +132,14 @@ public interface Subscriber {
      * message bound, the message bound for all other subscriptions on that topic will effectively be
      * infinite as the messages have to be stored for the first subscription in any case. </p>
      *
+     * <p>The message filter defines a {@link org.apache.hedwig.filter.ServerMessageFilter}
+     * run in hub server to filter messages delivered to the subscription. The server message
+     * filter should be placed in the classpath of hub server before using it.</p>
+     *
+     * All these subscription options would be stored as SubscriptionPreferences in metadata
+     * manager. The next time subscriber attached with difference options, the new options would
+     * overwrite the old options.
+     *
      * Usage is as follows:
      * <pre>
      * {@code
@@ -271,6 +280,31 @@ public interface Subscriber {
             throws ClientNotSubscribedException, AlreadyStartDeliveryException;
 
     /**
+     * Begin delivery of messages from the server to us for this topic and
+     * subscriberId.
+     *
+     * Only the messages passed <code>messageFilter</code> could be delivered to
+     * <code>messageHandler</code>.
+     *
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param messageHandler
+     *            Message Handler that will consume the subscribed messages
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic
+     * @throws AlreadyStartDeliveryException
+     *             If someone started delivery a message handler before stopping existed one.
+     * @throws NullPointerException
+     *             If either <code>messageHandler</code> or <code>messageFilter</code> is null.
+     */
+    public void startDeliveryWithFilter(ByteString topic, ByteString subscriberId,
+                                        MessageHandler messageHandler,
+                                        ClientMessageFilter messageFilter)
+            throws ClientNotSubscribedException, AlreadyStartDeliveryException;
+
+    /**
      * Stop delivery of messages for this topic and subscriberId.
      *
      * @param topic

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Mon Sep  3 15:14:13 2012
@@ -39,7 +39,11 @@ import org.apache.hedwig.exceptions.PubS
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
 import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 
 public class SubscribeResponseHandler {
 
@@ -117,12 +121,28 @@ public class SubscribeResponseHandler {
                 // Subscribe request.
                 origSubData = pubSubData;
 
+                SubscriptionPreferences preferences = null;
+                if (response.hasResponseBody()) {
+                    ResponseBody respBody = response.getResponseBody();
+                    if (respBody.hasSubscribeResponse()) {
+                        SubscribeResponse resp = respBody.getSubscribeResponse();
+                        if (resp.hasPreferences()) {
+                            preferences = resp.getPreferences();
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Receive subscription preferences for (topic:" + pubSubData.topic.toStringUtf8()
+                                           + ", subscriber:" + pubSubData.subscriberId.toStringUtf8() + ") :"
+                                           + SubscriptionStateUtils.toString(preferences));
+                            }
+                        }
+                    }
+                }
+
                 // Store the mapping for the TopicSubscriber to the Channel.
                 // This is so we can control the starting and stopping of
                 // message deliveries from the server on that Channel. Store
                 // this only on a successful ack response from the server.
                 TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
-                responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
+                responseHandler.getSubscriber().setChannelAndPreferencesForTopic(topicSubscriber, channel, preferences);
                 // Lazily create the Set (from a concurrent hashmap) to keep track
                 // of outstanding Messages to be consumed by the client app. At this
                 // stage, delivery for that topic hasn't started yet so creation of

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java?rev=1380268&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java Mon Sep  3 15:14:13 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Handlers used by a subscription.
+ */
+class FilterableMessageHandler implements MessageHandler {
+
+    MessageHandler msgHandler;
+    ClientMessageFilter  msgFilter;
+
+    public FilterableMessageHandler(MessageHandler msgHandler,
+                                    ClientMessageFilter msgFilter) {
+        this.msgHandler = msgHandler;
+        this.msgFilter = msgFilter;
+    }
+
+    public boolean hasMessageHandler() {
+        return null != msgHandler;
+    }
+
+    public MessageHandler getMessageHandler() {
+        return msgHandler;
+    }
+
+    public boolean hasMessageFilter() {
+        return null != msgFilter;
+    }
+
+    public ClientMessageFilter getMessageFilter() {
+        return msgFilter;
+    }
+
+    @Override
+    public void deliver(ByteString topic, ByteString subscriberId, Message msg,
+                        Callback<Void> callback, Object context) {
+        boolean deliver = true;
+        if (hasMessageFilter()) {
+            deliver = msgFilter.testMessage(msg);
+        }
+        if (deliver) {
+            msgHandler.deliver(topic, subscriberId, msg, callback, context);
+        } else {
+            callback.operationFinished(context, null);
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Mon Sep  3 15:14:13 2012
@@ -41,6 +41,7 @@ import org.apache.hedwig.client.data.Top
 import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
 import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
 import org.apache.hedwig.client.handlers.PubSubCallback;
+import org.apache.hedwig.filter.ClientMessageFilter;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
@@ -73,6 +74,8 @@ public class HedwigSubscriber implements
     // it. We can also get the ResponseHandler tied to the Channel via the
     // Channel Pipeline.
     protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+    protected final ConcurrentMap<TopicSubscriber, SubscriptionPreferences> topicSubscriber2Preferences =
+        new ConcurrentHashMap<TopicSubscriber, SubscriptionPreferences>();
 
     // Concurrent Map to store Message handler for each topic + sub id combination.
     // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
@@ -543,6 +546,30 @@ public class HedwigSubscriber implements
         startDelivery(topic, subscriberId, messageHandler, false);
     }
 
+    public void startDeliveryWithFilter(final ByteString topic, final ByteString subscriberId,
+                                        MessageHandler messageHandler,
+                                        ClientMessageFilter messageFilter)
+            throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+        if (null == messageHandler || null == messageFilter) {
+            throw new NullPointerException("Null message handler or message filter is provided.");
+        }
+        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+        SubscriptionPreferences preferences = topicSubscriber2Preferences.get(topicSubscriber);
+        if (null == preferences) {
+            throw new ClientNotSubscribedException("No subscription preferences found to filter messages for topic: "
+                    + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+        }
+        // pass subscription preferences to message filter
+        if (logger.isDebugEnabled()) {
+            logger.debug("Start delivering messages with filter on topic: " + topic.toStringUtf8()
+                         + ", subscriberId: " + subscriberId.toStringUtf8() + ", preferences: "
+                         + SubscriptionStateUtils.toString(preferences));
+        }
+        messageFilter.setSubscriptionPreferences(topic, subscriberId, preferences);
+        messageHandler = new FilterableMessageHandler(messageHandler, messageFilter);
+        startDelivery(topic, subscriberId, messageHandler, false);
+    }
+
     public void restartDelivery(final ByteString topic, final ByteString subscriberId)
         throws ClientNotSubscribedException, AlreadyStartDeliveryException {
         startDelivery(topic, subscriberId, null, true);
@@ -743,7 +770,8 @@ public class HedwigSubscriber implements
         return topicSubscriber2Channel.get(topic);
     }
 
-    public void setChannelForTopic(TopicSubscriber topic, Channel channel) {
+    public void setChannelAndPreferencesForTopic(TopicSubscriber topic, Channel channel,
+                                                 SubscriptionPreferences preferences) {
         synchronized (closeLock) {
             if (closed) {
                 channel.close();
@@ -753,11 +781,17 @@ public class HedwigSubscriber implements
             if (oldc != null) {
                 channel.close();
             }
+            if (null != preferences) {
+                topicSubscriber2Preferences.put(topic, preferences);
+            }
         }
     }
 
-    public void removeChannelForTopic(TopicSubscriber topic) {
-        topicSubscriber2Channel.remove(topic);
+    public void removeTopicSubscriber(TopicSubscriber topic) {
+        synchronized (topic) {
+            topicSubscriber2Preferences.remove(topic);
+            topicSubscriber2Channel.remove(topic);
+        }
     }
 
     void close() {

Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java (from r1379445, zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java&p1=zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java&r1=1379445&r2=1380268&rev=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java Mon Sep  3 15:14:13 2012
@@ -15,22 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hedwig.server.delivery;
+package org.apache.hedwig.filter;
 
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.filter.MessageFilter;
-
-public interface DeliveryManager {
-    public void start();
-
-    public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint, MessageFilter filter);
-
-    public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
-
-    /**
-     * Stop delivery manager
-     */
-    public void stop();
+/**
+ * Message Filter running in client-side.
+ */
+public interface ClientMessageFilter extends MessageFilterBase {
 }

Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java (from r1379445, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java&r1=1379445&r2=1380268&rev=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java Mon Sep  3 15:14:13 2012
@@ -17,31 +17,11 @@
  */
 package org.apache.hedwig.filter;
 
-import java.io.IOException;
-
 import com.google.protobuf.ByteString;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 
-public interface MessageFilter {
-
-    /**
-     * Initialize the message filter.
-     *
-     * @param conf
-     *          Configuration Object. An <i>MessageFilter</i> might read settings from it.
-     * @return message filter
-     * @throws IOException when failed to initialize message filter
-     */
-    public MessageFilter initialize(Configuration conf)
-    throws ConfigurationException, IOException;
-
-    /**
-     * Uninitialize the message filter.
-     */
-    public void uninitialize();
+public interface MessageFilterBase {
 
     /**
      * Set subscription preferences.
@@ -57,8 +37,8 @@ public interface MessageFilter {
      *          Subscription Preferences.
      * @return message filter
      */
-    public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
-                                                    SubscriptionPreferences preferences);
+    public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+                                                        SubscriptionPreferences preferences);
 
     /**
      * Tests whether a particular message passes the filter or not

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java Mon Sep  3 15:14:13 2012
@@ -24,19 +24,19 @@ import java.util.LinkedList;
 import com.google.protobuf.ByteString;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.filter.MessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 
 /**
  * A filter filters messages in pipeline.
  */
-public class PipelineFilter extends LinkedList<MessageFilter> implements MessageFilter {
+public class PipelineFilter extends LinkedList<ServerMessageFilter>
+implements ServerMessageFilter {
 
     @Override
-    public MessageFilter initialize(Configuration conf)
+    public ServerMessageFilter initialize(Configuration conf)
     throws ConfigurationException, IOException {
-        for (MessageFilter filter : this) {
+        for (ServerMessageFilter filter : this) {
             filter.initialize(conf);
         }
         return this;
@@ -45,15 +45,15 @@ public class PipelineFilter extends Link
     @Override
     public void uninitialize() {
         while (!isEmpty()) {
-            MessageFilter filter = removeLast();
+            ServerMessageFilter filter = removeLast();
             filter.uninitialize();
         }
     }
 
     @Override
-    public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
-                                                    SubscriptionPreferences preferences) {
-        for (MessageFilter filter : this) {
+    public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+                                                        SubscriptionPreferences preferences) {
+        for (ServerMessageFilter filter : this) {
             filter.setSubscriptionPreferences(topic, subscriberId, preferences);
         }
         return this;
@@ -61,7 +61,7 @@ public class PipelineFilter extends Link
 
     @Override
     public boolean testMessage(Message message) {
-        for (MessageFilter filter : this) {
+        for (ServerMessageFilter filter : this) {
             if (!filter.testMessage(message)) {
                 return false;
             }

Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java (from r1379445, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java&r1=1379445&r2=1380268&rev=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java Mon Sep  3 15:14:13 2012
@@ -19,13 +19,14 @@ package org.apache.hedwig.filter;
 
 import java.io.IOException;
 
-import com.google.protobuf.ByteString;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 
-public interface MessageFilter {
+/**
+ * Message Filter running in server-side. Hub server uses reflection to
+ * instantiate a message filter to filter messages.
+ */
+public interface ServerMessageFilter extends MessageFilterBase {
 
     /**
      * Initialize the message filter.
@@ -35,7 +36,7 @@ public interface MessageFilter {
      * @return message filter
      * @throws IOException when failed to initialize message filter
      */
-    public MessageFilter initialize(Configuration conf)
+    public ServerMessageFilter initialize(Configuration conf)
     throws ConfigurationException, IOException;
 
     /**
@@ -43,28 +44,4 @@ public interface MessageFilter {
      */
     public void uninitialize();
 
-    /**
-     * Set subscription preferences.
-     *
-     * <code>preferences</code> of the subscriber will be passed to message filter when
-     * the message filter attaches to its subscription either in server-side or client-side.
-     *
-     * @param topic
-     *          Topic Name.
-     * @param subscriberId
-     *          Subscriber Id.
-     * @param preferences
-     *          Subscription Preferences.
-     * @return message filter
-     */
-    public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
-                                                    SubscriptionPreferences preferences);
-
-    /**
-     * Tests whether a particular message passes the filter or not
-     *
-     * @param message
-     * @return
-     */
-    public boolean testMessage(Message message);
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Mon Sep  3 15:14:13 2012
@@ -10593,6 +10593,430 @@ public final class PubSubProtocol {
     // @@protoc_insertion_point(class_scope:Hedwig.PublishResponse)
   }
   
+  public interface SubscribeResponseOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+    
+    // optional .Hedwig.SubscriptionPreferences preferences = 2;
+    boolean hasPreferences();
+    org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences getPreferences();
+    org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder getPreferencesOrBuilder();
+  }
+  public static final class SubscribeResponse extends
+      com.google.protobuf.GeneratedMessage
+      implements SubscribeResponseOrBuilder {
+    // Use SubscribeResponse.newBuilder() to construct.
+    private SubscribeResponse(Builder builder) {
+      super(builder);
+    }
+    private SubscribeResponse(boolean noInit) {}
+    
+    private static final SubscribeResponse defaultInstance;
+    public static SubscribeResponse getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public SubscribeResponse getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_SubscribeResponse_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_SubscribeResponse_fieldAccessorTable;
+    }
+    
+    private int bitField0_;
+    // optional .Hedwig.SubscriptionPreferences preferences = 2;
+    public static final int PREFERENCES_FIELD_NUMBER = 2;
+    private org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences_;
+    public boolean hasPreferences() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences getPreferences() {
+      return preferences_;
+    }
+    public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder getPreferencesOrBuilder() {
+      return preferences_;
+    }
+    
+    private void initFields() {
+      preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(2, preferences_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(2, preferences_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_SubscribeResponse_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_SubscribeResponse_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          getPreferencesFieldBuilder();
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        if (preferencesBuilder_ == null) {
+          preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+        } else {
+          preferencesBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDescriptor();
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse getDefaultInstanceForType() {
+        return org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse build() {
+        org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse buildPartial() {
+        org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse result = new org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        if (preferencesBuilder_ == null) {
+          result.preferences_ = preferences_;
+        } else {
+          result.preferences_ = preferencesBuilder_.build();
+        }
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse) {
+          return mergeFrom((org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse other) {
+        if (other == org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance()) return this;
+        if (other.hasPreferences()) {
+          mergePreferences(other.getPreferences());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 18: {
+              org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder subBuilder = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.newBuilder();
+              if (hasPreferences()) {
+                subBuilder.mergeFrom(getPreferences());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setPreferences(subBuilder.buildPartial());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // optional .Hedwig.SubscriptionPreferences preferences = 2;
+      private org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder> preferencesBuilder_;
+      public boolean hasPreferences() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences getPreferences() {
+        if (preferencesBuilder_ == null) {
+          return preferences_;
+        } else {
+          return preferencesBuilder_.getMessage();
+        }
+      }
+      public Builder setPreferences(org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences value) {
+        if (preferencesBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          preferences_ = value;
+          onChanged();
+        } else {
+          preferencesBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder setPreferences(
+          org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder builderForValue) {
+        if (preferencesBuilder_ == null) {
+          preferences_ = builderForValue.build();
+          onChanged();
+        } else {
+          preferencesBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder mergePreferences(org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences value) {
+        if (preferencesBuilder_ == null) {
+          if (((bitField0_ & 0x00000001) == 0x00000001) &&
+              preferences_ != org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance()) {
+            preferences_ =
+              org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.newBuilder(preferences_).mergeFrom(value).buildPartial();
+          } else {
+            preferences_ = value;
+          }
+          onChanged();
+        } else {
+          preferencesBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder clearPreferences() {
+        if (preferencesBuilder_ == null) {
+          preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+          onChanged();
+        } else {
+          preferencesBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder getPreferencesBuilder() {
+        bitField0_ |= 0x00000001;
+        onChanged();
+        return getPreferencesFieldBuilder().getBuilder();
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder getPreferencesOrBuilder() {
+        if (preferencesBuilder_ != null) {
+          return preferencesBuilder_.getMessageOrBuilder();
+        } else {
+          return preferences_;
+        }
+      }
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder> 
+          getPreferencesFieldBuilder() {
+        if (preferencesBuilder_ == null) {
+          preferencesBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder>(
+                  preferences_,
+                  getParentForChildren(),
+                  isClean());
+          preferences_ = null;
+        }
+        return preferencesBuilder_;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:Hedwig.SubscribeResponse)
+    }
+    
+    static {
+      defaultInstance = new SubscribeResponse(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:Hedwig.SubscribeResponse)
+  }
+  
   public interface ResponseBodyOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
     
@@ -10600,6 +11024,11 @@ public final class PubSubProtocol {
     boolean hasPublishResponse();
     org.apache.hedwig.protocol.PubSubProtocol.PublishResponse getPublishResponse();
     org.apache.hedwig.protocol.PubSubProtocol.PublishResponseOrBuilder getPublishResponseOrBuilder();
+    
+    // optional .Hedwig.SubscribeResponse subscribeResponse = 2;
+    boolean hasSubscribeResponse();
+    org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse getSubscribeResponse();
+    org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder getSubscribeResponseOrBuilder();
   }
   public static final class ResponseBody extends
       com.google.protobuf.GeneratedMessage
@@ -10643,8 +11072,22 @@ public final class PubSubProtocol {
       return publishResponse_;
     }
     
+    // optional .Hedwig.SubscribeResponse subscribeResponse = 2;
+    public static final int SUBSCRIBERESPONSE_FIELD_NUMBER = 2;
+    private org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse subscribeResponse_;
+    public boolean hasSubscribeResponse() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse getSubscribeResponse() {
+      return subscribeResponse_;
+    }
+    public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder getSubscribeResponseOrBuilder() {
+      return subscribeResponse_;
+    }
+    
     private void initFields() {
       publishResponse_ = org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.getDefaultInstance();
+      subscribeResponse_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -10667,6 +11110,9 @@ public final class PubSubProtocol {
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         output.writeMessage(1, publishResponse_);
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(2, subscribeResponse_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -10680,6 +11126,10 @@ public final class PubSubProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(1, publishResponse_);
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(2, subscribeResponse_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -10797,6 +11247,7 @@ public final class PubSubProtocol {
       private void maybeForceBuilderInitialization() {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
           getPublishResponseFieldBuilder();
+          getSubscribeResponseFieldBuilder();
         }
       }
       private static Builder create() {
@@ -10811,6 +11262,12 @@ public final class PubSubProtocol {
           publishResponseBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000001);
+        if (subscribeResponseBuilder_ == null) {
+          subscribeResponse_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
+        } else {
+          subscribeResponseBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000002);
         return this;
       }
       
@@ -10857,6 +11314,14 @@ public final class PubSubProtocol {
         } else {
           result.publishResponse_ = publishResponseBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        if (subscribeResponseBuilder_ == null) {
+          result.subscribeResponse_ = subscribeResponse_;
+        } else {
+          result.subscribeResponse_ = subscribeResponseBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -10876,6 +11341,9 @@ public final class PubSubProtocol {
         if (other.hasPublishResponse()) {
           mergePublishResponse(other.getPublishResponse());
         }
+        if (other.hasSubscribeResponse()) {
+          mergeSubscribeResponse(other.getSubscribeResponse());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -10922,6 +11390,15 @@ public final class PubSubProtocol {
               setPublishResponse(subBuilder.buildPartial());
               break;
             }
+            case 18: {
+              org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder subBuilder = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.newBuilder();
+              if (hasSubscribeResponse()) {
+                subBuilder.mergeFrom(getSubscribeResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setSubscribeResponse(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -11018,6 +11495,96 @@ public final class PubSubProtocol {
         return publishResponseBuilder_;
       }
       
+      // optional .Hedwig.SubscribeResponse subscribeResponse = 2;
+      private org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse subscribeResponse_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder> subscribeResponseBuilder_;
+      public boolean hasSubscribeResponse() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse getSubscribeResponse() {
+        if (subscribeResponseBuilder_ == null) {
+          return subscribeResponse_;
+        } else {
+          return subscribeResponseBuilder_.getMessage();
+        }
+      }
+      public Builder setSubscribeResponse(org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse value) {
+        if (subscribeResponseBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          subscribeResponse_ = value;
+          onChanged();
+        } else {
+          subscribeResponseBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder setSubscribeResponse(
+          org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder builderForValue) {
+        if (subscribeResponseBuilder_ == null) {
+          subscribeResponse_ = builderForValue.build();
+          onChanged();
+        } else {
+          subscribeResponseBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder mergeSubscribeResponse(org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse value) {
+        if (subscribeResponseBuilder_ == null) {
+          if (((bitField0_ & 0x00000002) == 0x00000002) &&
+              subscribeResponse_ != org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance()) {
+            subscribeResponse_ =
+              org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.newBuilder(subscribeResponse_).mergeFrom(value).buildPartial();
+          } else {
+            subscribeResponse_ = value;
+          }
+          onChanged();
+        } else {
+          subscribeResponseBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearSubscribeResponse() {
+        if (subscribeResponseBuilder_ == null) {
+          subscribeResponse_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
+          onChanged();
+        } else {
+          subscribeResponseBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder getSubscribeResponseBuilder() {
+        bitField0_ |= 0x00000002;
+        onChanged();
+        return getSubscribeResponseFieldBuilder().getBuilder();
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder getSubscribeResponseOrBuilder() {
+        if (subscribeResponseBuilder_ != null) {
+          return subscribeResponseBuilder_.getMessageOrBuilder();
+        } else {
+          return subscribeResponse_;
+        }
+      }
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder> 
+          getSubscribeResponseFieldBuilder() {
+        if (subscribeResponseBuilder_ == null) {
+          subscribeResponseBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder>(
+                  subscribeResponse_,
+                  getParentForChildren(),
+                  isClean());
+          subscribeResponse_ = null;
+        }
+        return subscribeResponseBuilder_;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Hedwig.ResponseBody)
     }
     
@@ -14497,6 +15064,11 @@ public final class PubSubProtocol {
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_Hedwig_PublishResponse_fieldAccessorTable;
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_Hedwig_SubscribeResponse_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_Hedwig_SubscribeResponse_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_Hedwig_ResponseBody_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -14598,38 +15170,41 @@ public final class PubSubProtocol {
       "\014\022\024\n\014subscriberId\030\007 \001(\014\022*\n\014responseBody\030" +
       "\010 \001(\0132\024.Hedwig.ResponseBody\"?\n\017PublishRe" +
       "sponse\022,\n\016publishedMsgId\030\001 \002(\0132\024.Hedwig." +
-      "MessageSeqId\"@\n\014ResponseBody\0220\n\017publishR" +
-      "esponse\030\001 \001(\0132\027.Hedwig.PublishResponse\"N" +
-      "\n\021SubscriptionState\022#\n\005msgId\030\001 \002(\0132\024.Hed" +
-      "wig.MessageSeqId\022\024\n\014messageBound\030\002 \001(\r\"r" +
-      "\n\020SubscriptionData\022(\n\005state\030\001 \001(\0132\031.Hedw" +
-      "ig.SubscriptionState\0224\n\013preferences\030\002 \001(" +
-      "\0132\037.Hedwig.SubscriptionPreferences\"O\n\013Le",
-      "dgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdI" +
-      "ncluded\030\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014L" +
-      "edgerRanges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.Led" +
-      "gerRange\":\n\013ManagerMeta\022\023\n\013managerImpl\030\002" +
-      " \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n\013HubInfoD" +
-      "ata\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013" +
-      "HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017Protoc" +
-      "olVersion\022\017\n\013VERSION_ONE\020\001*p\n\rOperationT" +
-      "ype\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSU" +
-      "ME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020",
-      "\004\022\021\n\rSTOP_DELIVERY\020\005*\205\004\n\nStatusCode\022\013\n\007S" +
-      "UCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\rNO_S" +
-      "UCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSCRIBE" +
-      "D\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COUL" +
-      "D_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT" +
-      "_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOW" +
-      "N\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026INVALID_ME" +
-      "SSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_" +
-      "TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSI" +
-      "STENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION",
-      "_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214" +
-      "\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWN" +
-      "ER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITIO" +
-      "N\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apache.hedwi" +
-      "g.protocolH\001"
+      "MessageSeqId\"I\n\021SubscribeResponse\0224\n\013pre" +
+      "ferences\030\002 \001(\0132\037.Hedwig.SubscriptionPref" +
+      "erences\"v\n\014ResponseBody\0220\n\017publishRespon" +
+      "se\030\001 \001(\0132\027.Hedwig.PublishResponse\0224\n\021sub" +
+      "scribeResponse\030\002 \001(\0132\031.Hedwig.SubscribeR" +
+      "esponse\"N\n\021SubscriptionState\022#\n\005msgId\030\001 " +
+      "\002(\0132\024.Hedwig.MessageSeqId\022\024\n\014messageBoun",
+      "d\030\002 \001(\r\"r\n\020SubscriptionData\022(\n\005state\030\001 \001" +
+      "(\0132\031.Hedwig.SubscriptionState\0224\n\013prefere" +
+      "nces\030\002 \001(\0132\037.Hedwig.SubscriptionPreferen" +
+      "ces\"O\n\013LedgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020" +
+      "endSeqIdIncluded\030\002 \001(\0132\024.Hedwig.MessageS" +
+      "eqId\"3\n\014LedgerRanges\022#\n\006ranges\030\001 \003(\0132\023.H" +
+      "edwig.LedgerRange\":\n\013ManagerMeta\022\023\n\013mana" +
+      "gerImpl\030\002 \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n" +
+      "\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030" +
+      "\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*",
+      "\"\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020\001*p\n\rO" +
+      "perationType\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001" +
+      "\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_" +
+      "DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*\205\004\n\nStatus" +
+      "Code\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221" +
+      "\003\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_" +
+      "SUBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224" +
+      "\003\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020" +
+      "\226\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SE" +
+      "RVICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026I",
+      "NVALID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020" +
+      "\210\004\022\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TO" +
+      "PIC_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUB" +
+      "SCRIPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE" +
+      "_EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027" +
+      "TOPIC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED" +
+      "_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apa" +
+      "che.hedwig.protocolH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14772,16 +15347,24 @@ public final class PubSubProtocol {
               new java.lang.String[] { "PublishedMsgId", },
               org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.class,
               org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.Builder.class);
-          internal_static_Hedwig_ResponseBody_descriptor =
+          internal_static_Hedwig_SubscribeResponse_descriptor =
             getDescriptor().getMessageTypes().get(16);
+          internal_static_Hedwig_SubscribeResponse_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_Hedwig_SubscribeResponse_descriptor,
+              new java.lang.String[] { "Preferences", },
+              org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.class,
+              org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder.class);
+          internal_static_Hedwig_ResponseBody_descriptor =
+            getDescriptor().getMessageTypes().get(17);
           internal_static_Hedwig_ResponseBody_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_ResponseBody_descriptor,
-              new java.lang.String[] { "PublishResponse", },
+              new java.lang.String[] { "PublishResponse", "SubscribeResponse", },
               org.apache.hedwig.protocol.PubSubProtocol.ResponseBody.class,
               org.apache.hedwig.protocol.PubSubProtocol.ResponseBody.Builder.class);
           internal_static_Hedwig_SubscriptionState_descriptor =
-            getDescriptor().getMessageTypes().get(17);
+            getDescriptor().getMessageTypes().get(18);
           internal_static_Hedwig_SubscriptionState_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionState_descriptor,
@@ -14789,7 +15372,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState.Builder.class);
           internal_static_Hedwig_SubscriptionData_descriptor =
-            getDescriptor().getMessageTypes().get(18);
+            getDescriptor().getMessageTypes().get(19);
           internal_static_Hedwig_SubscriptionData_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionData_descriptor,
@@ -14797,7 +15380,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData.Builder.class);
           internal_static_Hedwig_LedgerRange_descriptor =
-            getDescriptor().getMessageTypes().get(19);
+            getDescriptor().getMessageTypes().get(20);
           internal_static_Hedwig_LedgerRange_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_LedgerRange_descriptor,
@@ -14805,7 +15388,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.class,
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.Builder.class);
           internal_static_Hedwig_LedgerRanges_descriptor =
-            getDescriptor().getMessageTypes().get(20);
+            getDescriptor().getMessageTypes().get(21);
           internal_static_Hedwig_LedgerRanges_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_LedgerRanges_descriptor,
@@ -14813,7 +15396,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.class,
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.Builder.class);
           internal_static_Hedwig_ManagerMeta_descriptor =
-            getDescriptor().getMessageTypes().get(21);
+            getDescriptor().getMessageTypes().get(22);
           internal_static_Hedwig_ManagerMeta_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_ManagerMeta_descriptor,
@@ -14821,7 +15404,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.class,
               org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.Builder.class);
           internal_static_Hedwig_HubInfoData_descriptor =
-            getDescriptor().getMessageTypes().get(22);
+            getDescriptor().getMessageTypes().get(23);
           internal_static_Hedwig_HubInfoData_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_HubInfoData_descriptor,
@@ -14829,7 +15412,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.HubInfoData.class,
               org.apache.hedwig.protocol.PubSubProtocol.HubInfoData.Builder.class);
           internal_static_Hedwig_HubLoadData_descriptor =
-            getDescriptor().getMessageTypes().get(23);
+            getDescriptor().getMessageTypes().get(24);
           internal_static_Hedwig_HubLoadData_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_HubLoadData_descriptor,

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java Mon Sep  3 15:14:13 2012
@@ -20,6 +20,7 @@ package org.apache.hedwig.protoextension
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
 import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
 
 public class PubSubResponseUtils {
@@ -37,7 +38,13 @@ public class PubSubResponseUtils {
         return getBasicBuilder(StatusCode.SUCCESS).setTxnId(txnId).build();
     }
 
+    public static PubSubResponse getSuccessResponse(long txnId, ResponseBody respBody) {
+        return getBasicBuilder(StatusCode.SUCCESS).setTxnId(txnId)
+               .setResponseBody(respBody).build();
+    }
+
     public static PubSubResponse getResponseForException(PubSubException e, long txnId) {
         return getBasicBuilder(e.getCode()).setStatusMsg(e.getMessage()).setTxnId(txnId).build();
     }
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Mon Sep  3 15:14:13 2012
@@ -193,8 +193,13 @@ message PublishResponse {
     required MessageSeqId publishedMsgId = 1;
 }
 
+message SubscribeResponse {
+    optional SubscriptionPreferences preferences = 2;
+}
+
 message ResponseBody {
     optional PublishResponse publishResponse = 1;
+    optional SubscribeResponse subscribeResponse = 2;
 }
 
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Mon Sep  3 15:14:13 2012
@@ -19,13 +19,13 @@ package org.apache.hedwig.server.deliver
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.ServerMessageFilter;
 
 public interface DeliveryManager {
     public void start();
 
     public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint, MessageFilter filter);
+                                         DeliveryEndPoint endPoint, ServerMessageFilter filter);
 
     public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Mon Sep  3 15:14:13 2012
@@ -36,7 +36,7 @@ import com.google.protobuf.ByteString;
 
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.ServerMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
@@ -137,7 +137,7 @@ public class FIFODeliveryManager impleme
      *            subscriber
      */
     public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint, MessageFilter filter) {
+                                         DeliveryEndPoint endPoint, ServerMessageFilter filter) {
 
         ActiveSubscriberState subscriber = new ActiveSubscriberState(topic, subscriberId, seqIdToStartFrom
                 .getLocalComponent() - 1, endPoint, filter);
@@ -299,13 +299,13 @@ public class FIFODeliveryManager impleme
         long lastScanErrorTime = -1;
         long localSeqIdDeliveringNow;
         long lastSeqIdCommunicatedExternally;
-        MessageFilter filter;
+        ServerMessageFilter filter;
         // TODO make use of these variables
 
         final static int SEQ_ID_SLACK = 10;
 
         public ActiveSubscriberState(ByteString topic, ByteString subscriberId, long lastLocalSeqIdDelivered,
-                                     DeliveryEndPoint deliveryEndPoint, MessageFilter filter) {
+                                     DeliveryEndPoint deliveryEndPoint, ServerMessageFilter filter) {
             this.topic = topic;
             this.subscriberId = subscriberId;
             this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
@@ -316,6 +316,8 @@ public class FIFODeliveryManager impleme
         public void setNotConnected() {
             this.connected = false;
             deliveryEndPoint.close();
+            // uninitialize filter
+            this.filter.uninitialize();
         }
 
         public ByteString getTopic() {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Mon Sep  3 15:14:13 2012
@@ -31,12 +31,14 @@ import org.apache.bookkeeper.util.Reflec
 import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.filter.MessageFilter;
 import org.apache.hedwig.filter.PipelineFilter;
+import org.apache.hedwig.filter.ServerMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
 import org.apache.hedwig.protoextensions.PubSubResponseUtils;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
@@ -164,7 +166,7 @@ public class SubscribeHandler extends Ba
                     if (subData.hasPreferences() &&
                         subData.getPreferences().hasMessageFilter()) {
                         String messageFilterName = subData.getPreferences().getMessageFilter();
-                        filter.addLast(ReflectionUtils.newInstance(messageFilterName, MessageFilter.class));
+                        filter.addLast(ReflectionUtils.newInstance(messageFilterName, ServerMessageFilter.class));
                     }
                     // initialize the filter
                     filter.initialize(cfg.getConf());
@@ -194,7 +196,14 @@ public class SubscribeHandler extends Ba
                 // First write success and then tell the delivery manager,
                 // otherwise the first message might go out before the response
                 // to the subscribe
-                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+                SubscribeResponse.Builder subRespBuilder = SubscribeResponse.newBuilder()
+                    .setPreferences(subData.getPreferences());
+                ResponseBody respBody = ResponseBody.newBuilder()
+                    .setSubscribeResponse(subRespBuilder).build();
+                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId(), respBody));
+                logger.info("Subscribe request (" + request.getTxnId() + ") for (topic:" + topic.toStringUtf8()
+                          + ", subscriber:" + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
+                          + " succeed - its subscription data is " + SubscriptionStateUtils.toString(subData));
                 subStats.updateLatency(MathUtils.now() - requestTime);
 
                 // want to start 1 ahead of the consume ptr

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java Mon Sep  3 15:14:13 2012
@@ -22,19 +22,20 @@ import java.io.IOException;
 import com.google.protobuf.ByteString;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.MessageFilterBase;
+import org.apache.hedwig.filter.ServerMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
 
-public class AllToAllTopologyFilter implements MessageFilter {
+public class AllToAllTopologyFilter implements ServerMessageFilter {
 
     ByteString subscriberRegion;
     boolean isHubSubscriber;
 
     @Override
-    public MessageFilter initialize(Configuration conf)
+    public ServerMessageFilter initialize(Configuration conf)
     throws ConfigurationException, IOException {
         String region = conf.getString(ServerConfiguration.REGION, "standalone");
         if (null == region) {
@@ -50,8 +51,8 @@ public class AllToAllTopologyFilter impl
     }
 
     @Override
-    public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
-                                                    SubscriptionPreferences preferences) {
+    public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+                                                        SubscriptionPreferences preferences) {
         isHubSubscriber = SubscriptionStateUtils.isHubSubscriber(subscriberId);
         return this;
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Mon Sep  3 15:14:13 2012
@@ -19,6 +19,7 @@ package org.apache.hedwig.server;
 
 import java.net.InetAddress;
 import java.io.File;
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -33,6 +34,8 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -658,6 +661,78 @@ public class TestBackwardCompat extends 
             subscriber.closeSubscription(topic, subid);
         }
 
+        void receiveNumModM(final ByteString topic, final ByteString subid,
+                            final int start, final int num, final int M) throws Exception {
+            org.apache.hedwig.filter.ServerMessageFilter filter =
+                new org.apache.hedwig.filter.ServerMessageFilter() {
+
+                @Override
+                public org.apache.hedwig.filter.ServerMessageFilter
+                    initialize(Configuration conf) {
+                    // do nothing
+                    return this;
+                }
+
+                @Override
+                public void uninitialize() {
+                    // do nothing;
+                }
+
+                @Override
+                public org.apache.hedwig.filter.MessageFilterBase
+                    setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+                    org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences) {
+                    // do nothing;
+                    return this;
+                }
+
+                @Override
+                public boolean testMessage(org.apache.hedwig.protocol.PubSubProtocol.Message msg) {
+                    int value = Integer.valueOf(msg.getBody().toStringUtf8());
+                    return 0 == value % M;
+                }
+            };
+            filter.initialize(conf.getConf());
+
+            subscriber.subscribe(topic, subid, org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH);
+            final int base = start + M - start % M;
+            final AtomicInteger expected = new AtomicInteger(base);
+            final CountDownLatch latch = new CountDownLatch(1);
+            subscriber.startDeliveryWithFilter(topic, subid, new org.apache.hedwig.client.api.MessageHandler() {
+                synchronized public void deliver(ByteString topic, ByteString subscriberId,
+                                                 org.apache.hedwig.protocol.PubSubProtocol.Message msg,
+                                                 org.apache.hedwig.util.Callback<Void> callback, Object context) {
+                    try {
+                        int value = Integer.valueOf(msg.getBody().toStringUtf8());
+                        // duplicated messages received, ignore them
+                        if (value > start) {
+                            if (value == expected.get()) {
+                                expected.addAndGet(M);
+                            } else {
+                                logger.error("Did not receive expected value, expected {}, got {}",
+                                             expected.get(), value);
+                                expected.set(0);
+                                latch.countDown();
+                            }
+                            if (expected.get() == (base + num * M)) {
+                                latch.countDown();
+                            }
+                        }
+                        callback.operationFinished(context, null);
+                    } catch (Exception e) {
+                        logger.error("Received bad message", e);
+                        latch.countDown();
+                    }
+                }
+            }, (org.apache.hedwig.filter.ClientMessageFilter) filter);
+            assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(),
+                       latch.await(10, TimeUnit.SECONDS));
+            assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get());
+            subscriber.stopDelivery(topic, subid);
+            filter.uninitialize();
+            subscriber.closeSubscription(topic, subid);
+        }
+
         void subscribe(ByteString topic, ByteString subscriberId) throws Exception {
             org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
                 org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
@@ -918,4 +993,40 @@ public class TestBackwardCompat extends 
         bkccur.stop();
     }
 
+    /**
+     * Test compatability between version 4.1.0 and the current version.
+     *
+     * A current client running message filter would fail on 4.1.0 hub servers.
+     */
+    @Test
+    public void testClientMessageFilterCompat410() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("TestUpdateMessageBoundCompat410");
+        ByteString subid = ByteString.copyFromUtf8("mysub");
+
+        // start bookkeeper
+        BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
+        bkc410.start();
+
+        // start hub server 410
+        Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString());
+        s410.start();
+
+        ClientCurrent ccur = new ClientCurrent();
+        ccur.subscribe(topic, subid);
+        ccur.closeSubscription(topic, subid);
+
+        ccur.publishInts(topic, 0, 100);
+        try {
+            ccur.receiveNumModM(topic, subid, 0, 50, 2);
+            fail("client-side filter could not run on 4.1.0 hub server");
+        } catch (Exception e) {
+            logger.info("Should fail to run client-side message filter on 4.1.0 hub server.", e);
+            ccur.closeSubscription(topic, subid);
+        }
+
+        // stop 410 server
+        s410.stop();
+        // stop bookkeeper cluster
+        bkc410.stop();
+    }
 }