You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/03 17:14:14 UTC
svn commit: r1380268 [1/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/api/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-...
Author: ivank
Date: Mon Sep 3 15:14:13 2012
New Revision: 1380268
URL: http://svn.apache.org/viewvc?rev=1380268&view=rev
Log:
BOOKKEEPER-334: client-side message filter for java client. (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java
- copied, changed from r1379445, zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java
- copied, changed from r1379445, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java
- copied, changed from r1379445, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java
Removed:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Sep 3 15:14:13 2012
@@ -134,6 +134,8 @@ Trunk (unreleased changes)
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
+ BOOKKEEPER-334: client-side message filter for java client. (sijie via ivank)
+
Release 4.1.0 - 2012-06-07
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Mon Sep 3 15:14:13 2012
@@ -26,6 +26,7 @@ import org.apache.hedwig.exceptions.PubS
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.filter.ClientMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
@@ -118,7 +119,7 @@ public interface Subscriber {
* <p>Subscribe to the given topic asynchronously for the inputted subscriberId.</p>
*
* <p>SubscriptionOptions contains parameters for how the hub should make the subscription.
- * The two options are the createorattach mode and message bound.</p>
+ * The options includes createorattach mode, message bound and message filter.</p>
*
* <p>The createorattach mode defines whether the subscription should create a new subscription, or
* just attach to a preexisting subscription. If it tries to create the subscription, and the
@@ -131,6 +132,14 @@ public interface Subscriber {
* message bound, the message bound for all other subscriptions on that topic will effectively be
* infinite as the messages have to be stored for the first subscription in any case. </p>
*
+ * <p>The message filter defines a {@link org.apache.hedwig.filter.ServerMessageFilter}
+ * run in hub server to filter messages delivered to the subscription. The server message
+ * filter should be placed in the classpath of hub server before using it.</p>
+ *
+ * All these subscription options would be stored as SubscriptionPreferences in metadata
+ * manager. The next time subscriber attached with difference options, the new options would
+ * overwrite the old options.
+ *
* Usage is as follows:
* <pre>
* {@code
@@ -271,6 +280,31 @@ public interface Subscriber {
throws ClientNotSubscribedException, AlreadyStartDeliveryException;
/**
+ * Begin delivery of messages from the server to us for this topic and
+ * subscriberId.
+ *
+ * Only the messages passed <code>messageFilter</code> could be delivered to
+ * <code>messageHandler</code>.
+ *
+ * @param topic
+ * Topic name of the subscription
+ * @param subscriberId
+ * ID of the subscriber
+ * @param messageHandler
+ * Message Handler that will consume the subscribed messages
+ * @throws ClientNotSubscribedException
+ * If the client is not currently subscribed to the topic
+ * @throws AlreadyStartDeliveryException
+ * If someone started delivery a message handler before stopping existed one.
+ * @throws NullPointerException
+ * If either <code>messageHandler</code> or <code>messageFilter</code> is null.
+ */
+ public void startDeliveryWithFilter(ByteString topic, ByteString subscriberId,
+ MessageHandler messageHandler,
+ ClientMessageFilter messageFilter)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException;
+
+ /**
* Stop delivery of messages for this topic and subscriberId.
*
* @param topic
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Mon Sep 3 15:14:13 2012
@@ -39,7 +39,11 @@ import org.apache.hedwig.exceptions.PubS
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
public class SubscribeResponseHandler {
@@ -117,12 +121,28 @@ public class SubscribeResponseHandler {
// Subscribe request.
origSubData = pubSubData;
+ SubscriptionPreferences preferences = null;
+ if (response.hasResponseBody()) {
+ ResponseBody respBody = response.getResponseBody();
+ if (respBody.hasSubscribeResponse()) {
+ SubscribeResponse resp = respBody.getSubscribeResponse();
+ if (resp.hasPreferences()) {
+ preferences = resp.getPreferences();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Receive subscription preferences for (topic:" + pubSubData.topic.toStringUtf8()
+ + ", subscriber:" + pubSubData.subscriberId.toStringUtf8() + ") :"
+ + SubscriptionStateUtils.toString(preferences));
+ }
+ }
+ }
+ }
+
// Store the mapping for the TopicSubscriber to the Channel.
// This is so we can control the starting and stopping of
// message deliveries from the server on that Channel. Store
// this only on a successful ack response from the server.
TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
- responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
+ responseHandler.getSubscriber().setChannelAndPreferencesForTopic(topicSubscriber, channel, preferences);
// Lazily create the Set (from a concurrent hashmap) to keep track
// of outstanding Messages to be consumed by the client app. At this
// stage, delivery for that topic hasn't started yet so creation of
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java?rev=1380268&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java Mon Sep 3 15:14:13 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Handlers used by a subscription.
+ */
+class FilterableMessageHandler implements MessageHandler {
+
+ MessageHandler msgHandler;
+ ClientMessageFilter msgFilter;
+
+ public FilterableMessageHandler(MessageHandler msgHandler,
+ ClientMessageFilter msgFilter) {
+ this.msgHandler = msgHandler;
+ this.msgFilter = msgFilter;
+ }
+
+ public boolean hasMessageHandler() {
+ return null != msgHandler;
+ }
+
+ public MessageHandler getMessageHandler() {
+ return msgHandler;
+ }
+
+ public boolean hasMessageFilter() {
+ return null != msgFilter;
+ }
+
+ public ClientMessageFilter getMessageFilter() {
+ return msgFilter;
+ }
+
+ @Override
+ public void deliver(ByteString topic, ByteString subscriberId, Message msg,
+ Callback<Void> callback, Object context) {
+ boolean deliver = true;
+ if (hasMessageFilter()) {
+ deliver = msgFilter.testMessage(msg);
+ }
+ if (deliver) {
+ msgHandler.deliver(topic, subscriberId, msg, callback, context);
+ } else {
+ callback.operationFinished(context, null);
+ }
+ }
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Mon Sep 3 15:14:13 2012
@@ -41,6 +41,7 @@ import org.apache.hedwig.client.data.Top
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hedwig.client.handlers.PubSubCallback;
+import org.apache.hedwig.filter.ClientMessageFilter;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
@@ -73,6 +74,8 @@ public class HedwigSubscriber implements
// it. We can also get the ResponseHandler tied to the Channel via the
// Channel Pipeline.
protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+ protected final ConcurrentMap<TopicSubscriber, SubscriptionPreferences> topicSubscriber2Preferences =
+ new ConcurrentHashMap<TopicSubscriber, SubscriptionPreferences>();
// Concurrent Map to store Message handler for each topic + sub id combination.
// Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
@@ -543,6 +546,30 @@ public class HedwigSubscriber implements
startDelivery(topic, subscriberId, messageHandler, false);
}
+ public void startDeliveryWithFilter(final ByteString topic, final ByteString subscriberId,
+ MessageHandler messageHandler,
+ ClientMessageFilter messageFilter)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ if (null == messageHandler || null == messageFilter) {
+ throw new NullPointerException("Null message handler or message filter is provided.");
+ }
+ TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+ SubscriptionPreferences preferences = topicSubscriber2Preferences.get(topicSubscriber);
+ if (null == preferences) {
+ throw new ClientNotSubscribedException("No subscription preferences found to filter messages for topic: "
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ }
+ // pass subscription preferences to message filter
+ if (logger.isDebugEnabled()) {
+ logger.debug("Start delivering messages with filter on topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8() + ", preferences: "
+ + SubscriptionStateUtils.toString(preferences));
+ }
+ messageFilter.setSubscriptionPreferences(topic, subscriberId, preferences);
+ messageHandler = new FilterableMessageHandler(messageHandler, messageFilter);
+ startDelivery(topic, subscriberId, messageHandler, false);
+ }
+
public void restartDelivery(final ByteString topic, final ByteString subscriberId)
throws ClientNotSubscribedException, AlreadyStartDeliveryException {
startDelivery(topic, subscriberId, null, true);
@@ -743,7 +770,8 @@ public class HedwigSubscriber implements
return topicSubscriber2Channel.get(topic);
}
- public void setChannelForTopic(TopicSubscriber topic, Channel channel) {
+ public void setChannelAndPreferencesForTopic(TopicSubscriber topic, Channel channel,
+ SubscriptionPreferences preferences) {
synchronized (closeLock) {
if (closed) {
channel.close();
@@ -753,11 +781,17 @@ public class HedwigSubscriber implements
if (oldc != null) {
channel.close();
}
+ if (null != preferences) {
+ topicSubscriber2Preferences.put(topic, preferences);
+ }
}
}
- public void removeChannelForTopic(TopicSubscriber topic) {
- topicSubscriber2Channel.remove(topic);
+ public void removeTopicSubscriber(TopicSubscriber topic) {
+ synchronized (topic) {
+ topicSubscriber2Preferences.remove(topic);
+ topicSubscriber2Channel.remove(topic);
+ }
}
void close() {
Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java (from r1379445, zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java&p1=zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java&r1=1379445&r2=1380268&rev=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java Mon Sep 3 15:14:13 2012
@@ -15,22 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hedwig.server.delivery;
+package org.apache.hedwig.filter;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.filter.MessageFilter;
-
-public interface DeliveryManager {
- public void start();
-
- public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, MessageFilter filter);
-
- public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
-
- /**
- * Stop delivery manager
- */
- public void stop();
+/**
+ * Message Filter running in client-side.
+ */
+public interface ClientMessageFilter extends MessageFilterBase {
}
Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java (from r1379445, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java&r1=1379445&r2=1380268&rev=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java Mon Sep 3 15:14:13 2012
@@ -17,31 +17,11 @@
*/
package org.apache.hedwig.filter;
-import java.io.IOException;
-
import com.google.protobuf.ByteString;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-public interface MessageFilter {
-
- /**
- * Initialize the message filter.
- *
- * @param conf
- * Configuration Object. An <i>MessageFilter</i> might read settings from it.
- * @return message filter
- * @throws IOException when failed to initialize message filter
- */
- public MessageFilter initialize(Configuration conf)
- throws ConfigurationException, IOException;
-
- /**
- * Uninitialize the message filter.
- */
- public void uninitialize();
+public interface MessageFilterBase {
/**
* Set subscription preferences.
@@ -57,8 +37,8 @@ public interface MessageFilter {
* Subscription Preferences.
* @return message filter
*/
- public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences);
+ public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences);
/**
* Tests whether a particular message passes the filter or not
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java Mon Sep 3 15:14:13 2012
@@ -24,19 +24,19 @@ import java.util.LinkedList;
import com.google.protobuf.ByteString;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.filter.MessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
/**
* A filter filters messages in pipeline.
*/
-public class PipelineFilter extends LinkedList<MessageFilter> implements MessageFilter {
+public class PipelineFilter extends LinkedList<ServerMessageFilter>
+implements ServerMessageFilter {
@Override
- public MessageFilter initialize(Configuration conf)
+ public ServerMessageFilter initialize(Configuration conf)
throws ConfigurationException, IOException {
- for (MessageFilter filter : this) {
+ for (ServerMessageFilter filter : this) {
filter.initialize(conf);
}
return this;
@@ -45,15 +45,15 @@ public class PipelineFilter extends Link
@Override
public void uninitialize() {
while (!isEmpty()) {
- MessageFilter filter = removeLast();
+ ServerMessageFilter filter = removeLast();
filter.uninitialize();
}
}
@Override
- public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences) {
- for (MessageFilter filter : this) {
+ public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences) {
+ for (ServerMessageFilter filter : this) {
filter.setSubscriptionPreferences(topic, subscriberId, preferences);
}
return this;
@@ -61,7 +61,7 @@ public class PipelineFilter extends Link
@Override
public boolean testMessage(Message message) {
- for (MessageFilter filter : this) {
+ for (ServerMessageFilter filter : this) {
if (!filter.testMessage(message)) {
return false;
}
Copied: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java (from r1379445, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java?p2=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java&r1=1379445&r2=1380268&rev=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java Mon Sep 3 15:14:13 2012
@@ -19,13 +19,14 @@ package org.apache.hedwig.filter;
import java.io.IOException;
-import com.google.protobuf.ByteString;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-public interface MessageFilter {
+/**
+ * Message Filter running in server-side. Hub server uses reflection to
+ * instantiate a message filter to filter messages.
+ */
+public interface ServerMessageFilter extends MessageFilterBase {
/**
* Initialize the message filter.
@@ -35,7 +36,7 @@ public interface MessageFilter {
* @return message filter
* @throws IOException when failed to initialize message filter
*/
- public MessageFilter initialize(Configuration conf)
+ public ServerMessageFilter initialize(Configuration conf)
throws ConfigurationException, IOException;
/**
@@ -43,28 +44,4 @@ public interface MessageFilter {
*/
public void uninitialize();
- /**
- * Set subscription preferences.
- *
- * <code>preferences</code> of the subscriber will be passed to message filter when
- * the message filter attaches to its subscription either in server-side or client-side.
- *
- * @param topic
- * Topic Name.
- * @param subscriberId
- * Subscriber Id.
- * @param preferences
- * Subscription Preferences.
- * @return message filter
- */
- public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences);
-
- /**
- * Tests whether a particular message passes the filter or not
- *
- * @param message
- * @return
- */
- public boolean testMessage(Message message);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Mon Sep 3 15:14:13 2012
@@ -10593,6 +10593,430 @@ public final class PubSubProtocol {
// @@protoc_insertion_point(class_scope:Hedwig.PublishResponse)
}
+ public interface SubscribeResponseOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional .Hedwig.SubscriptionPreferences preferences = 2;
+ boolean hasPreferences();
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences getPreferences();
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder getPreferencesOrBuilder();
+ }
+ public static final class SubscribeResponse extends
+ com.google.protobuf.GeneratedMessage
+ implements SubscribeResponseOrBuilder {
+ // Use SubscribeResponse.newBuilder() to construct.
+ private SubscribeResponse(Builder builder) {
+ super(builder);
+ }
+ private SubscribeResponse(boolean noInit) {}
+
+ private static final SubscribeResponse defaultInstance;
+ public static SubscribeResponse getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public SubscribeResponse getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_SubscribeResponse_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_SubscribeResponse_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // optional .Hedwig.SubscriptionPreferences preferences = 2;
+ public static final int PREFERENCES_FIELD_NUMBER = 2;
+ private org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences_;
+ public boolean hasPreferences() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences getPreferences() {
+ return preferences_;
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder getPreferencesOrBuilder() {
+ return preferences_;
+ }
+
+ private void initFields() {
+ preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeMessage(2, preferences_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(2, preferences_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_SubscribeResponse_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_SubscribeResponse_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getPreferencesFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ if (preferencesBuilder_ == null) {
+ preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+ } else {
+ preferencesBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDescriptor();
+ }
+
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse getDefaultInstanceForType() {
+ return org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
+ }
+
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse build() {
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse buildPartial() {
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse result = new org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ if (preferencesBuilder_ == null) {
+ result.preferences_ = preferences_;
+ } else {
+ result.preferences_ = preferencesBuilder_.build();
+ }
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse) {
+ return mergeFrom((org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse other) {
+ if (other == org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance()) return this;
+ if (other.hasPreferences()) {
+ mergePreferences(other.getPreferences());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 18: {
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder subBuilder = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.newBuilder();
+ if (hasPreferences()) {
+ subBuilder.mergeFrom(getPreferences());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setPreferences(subBuilder.buildPartial());
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // optional .Hedwig.SubscriptionPreferences preferences = 2;
+ private org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder> preferencesBuilder_;
+ public boolean hasPreferences() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences getPreferences() {
+ if (preferencesBuilder_ == null) {
+ return preferences_;
+ } else {
+ return preferencesBuilder_.getMessage();
+ }
+ }
+ public Builder setPreferences(org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences value) {
+ if (preferencesBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ preferences_ = value;
+ onChanged();
+ } else {
+ preferencesBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder setPreferences(
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder builderForValue) {
+ if (preferencesBuilder_ == null) {
+ preferences_ = builderForValue.build();
+ onChanged();
+ } else {
+ preferencesBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder mergePreferences(org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences value) {
+ if (preferencesBuilder_ == null) {
+ if (((bitField0_ & 0x00000001) == 0x00000001) &&
+ preferences_ != org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance()) {
+ preferences_ =
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.newBuilder(preferences_).mergeFrom(value).buildPartial();
+ } else {
+ preferences_ = value;
+ }
+ onChanged();
+ } else {
+ preferencesBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder clearPreferences() {
+ if (preferencesBuilder_ == null) {
+ preferences_ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.getDefaultInstance();
+ onChanged();
+ } else {
+ preferencesBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder getPreferencesBuilder() {
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return getPreferencesFieldBuilder().getBuilder();
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder getPreferencesOrBuilder() {
+ if (preferencesBuilder_ != null) {
+ return preferencesBuilder_.getMessageOrBuilder();
+ } else {
+ return preferences_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder>
+ getPreferencesFieldBuilder() {
+ if (preferencesBuilder_ == null) {
+ preferencesBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferencesOrBuilder>(
+ preferences_,
+ getParentForChildren(),
+ isClean());
+ preferences_ = null;
+ }
+ return preferencesBuilder_;
+ }
+
+ // @@protoc_insertion_point(builder_scope:Hedwig.SubscribeResponse)
+ }
+
+ static {
+ defaultInstance = new SubscribeResponse(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:Hedwig.SubscribeResponse)
+ }
+
public interface ResponseBodyOrBuilder
extends com.google.protobuf.MessageOrBuilder {
@@ -10600,6 +11024,11 @@ public final class PubSubProtocol {
boolean hasPublishResponse();
org.apache.hedwig.protocol.PubSubProtocol.PublishResponse getPublishResponse();
org.apache.hedwig.protocol.PubSubProtocol.PublishResponseOrBuilder getPublishResponseOrBuilder();
+
+ // optional .Hedwig.SubscribeResponse subscribeResponse = 2;
+ boolean hasSubscribeResponse();
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse getSubscribeResponse();
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder getSubscribeResponseOrBuilder();
}
public static final class ResponseBody extends
com.google.protobuf.GeneratedMessage
@@ -10643,8 +11072,22 @@ public final class PubSubProtocol {
return publishResponse_;
}
+ // optional .Hedwig.SubscribeResponse subscribeResponse = 2;
+ public static final int SUBSCRIBERESPONSE_FIELD_NUMBER = 2;
+ private org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse subscribeResponse_;
+ public boolean hasSubscribeResponse() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse getSubscribeResponse() {
+ return subscribeResponse_;
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder getSubscribeResponseOrBuilder() {
+ return subscribeResponse_;
+ }
+
private void initFields() {
publishResponse_ = org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.getDefaultInstance();
+ subscribeResponse_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -10667,6 +11110,9 @@ public final class PubSubProtocol {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeMessage(1, publishResponse_);
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeMessage(2, subscribeResponse_);
+ }
getUnknownFields().writeTo(output);
}
@@ -10680,6 +11126,10 @@ public final class PubSubProtocol {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, publishResponse_);
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(2, subscribeResponse_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -10797,6 +11247,7 @@ public final class PubSubProtocol {
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getPublishResponseFieldBuilder();
+ getSubscribeResponseFieldBuilder();
}
}
private static Builder create() {
@@ -10811,6 +11262,12 @@ public final class PubSubProtocol {
publishResponseBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
+ if (subscribeResponseBuilder_ == null) {
+ subscribeResponse_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
+ } else {
+ subscribeResponseBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
@@ -10857,6 +11314,14 @@ public final class PubSubProtocol {
} else {
result.publishResponse_ = publishResponseBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ if (subscribeResponseBuilder_ == null) {
+ result.subscribeResponse_ = subscribeResponse_;
+ } else {
+ result.subscribeResponse_ = subscribeResponseBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -10876,6 +11341,9 @@ public final class PubSubProtocol {
if (other.hasPublishResponse()) {
mergePublishResponse(other.getPublishResponse());
}
+ if (other.hasSubscribeResponse()) {
+ mergeSubscribeResponse(other.getSubscribeResponse());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -10922,6 +11390,15 @@ public final class PubSubProtocol {
setPublishResponse(subBuilder.buildPartial());
break;
}
+ case 18: {
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder subBuilder = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.newBuilder();
+ if (hasSubscribeResponse()) {
+ subBuilder.mergeFrom(getSubscribeResponse());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setSubscribeResponse(subBuilder.buildPartial());
+ break;
+ }
}
}
}
@@ -11018,6 +11495,96 @@ public final class PubSubProtocol {
return publishResponseBuilder_;
}
+ // optional .Hedwig.SubscribeResponse subscribeResponse = 2;
+ private org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse subscribeResponse_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder> subscribeResponseBuilder_;
+ public boolean hasSubscribeResponse() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse getSubscribeResponse() {
+ if (subscribeResponseBuilder_ == null) {
+ return subscribeResponse_;
+ } else {
+ return subscribeResponseBuilder_.getMessage();
+ }
+ }
+ public Builder setSubscribeResponse(org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse value) {
+ if (subscribeResponseBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ subscribeResponse_ = value;
+ onChanged();
+ } else {
+ subscribeResponseBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ public Builder setSubscribeResponse(
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder builderForValue) {
+ if (subscribeResponseBuilder_ == null) {
+ subscribeResponse_ = builderForValue.build();
+ onChanged();
+ } else {
+ subscribeResponseBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ public Builder mergeSubscribeResponse(org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse value) {
+ if (subscribeResponseBuilder_ == null) {
+ if (((bitField0_ & 0x00000002) == 0x00000002) &&
+ subscribeResponse_ != org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance()) {
+ subscribeResponse_ =
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.newBuilder(subscribeResponse_).mergeFrom(value).buildPartial();
+ } else {
+ subscribeResponse_ = value;
+ }
+ onChanged();
+ } else {
+ subscribeResponseBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ public Builder clearSubscribeResponse() {
+ if (subscribeResponseBuilder_ == null) {
+ subscribeResponse_ = org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.getDefaultInstance();
+ onChanged();
+ } else {
+ subscribeResponseBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder getSubscribeResponseBuilder() {
+ bitField0_ |= 0x00000002;
+ onChanged();
+ return getSubscribeResponseFieldBuilder().getBuilder();
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder getSubscribeResponseOrBuilder() {
+ if (subscribeResponseBuilder_ != null) {
+ return subscribeResponseBuilder_.getMessageOrBuilder();
+ } else {
+ return subscribeResponse_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder>
+ getSubscribeResponseFieldBuilder() {
+ if (subscribeResponseBuilder_ == null) {
+ subscribeResponseBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder, org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponseOrBuilder>(
+ subscribeResponse_,
+ getParentForChildren(),
+ isClean());
+ subscribeResponse_ = null;
+ }
+ return subscribeResponseBuilder_;
+ }
+
// @@protoc_insertion_point(builder_scope:Hedwig.ResponseBody)
}
@@ -14497,6 +15064,11 @@ public final class PubSubProtocol {
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_Hedwig_PublishResponse_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_Hedwig_SubscribeResponse_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_Hedwig_SubscribeResponse_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
internal_static_Hedwig_ResponseBody_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -14598,38 +15170,41 @@ public final class PubSubProtocol {
"\014\022\024\n\014subscriberId\030\007 \001(\014\022*\n\014responseBody\030" +
"\010 \001(\0132\024.Hedwig.ResponseBody\"?\n\017PublishRe" +
"sponse\022,\n\016publishedMsgId\030\001 \002(\0132\024.Hedwig." +
- "MessageSeqId\"@\n\014ResponseBody\0220\n\017publishR" +
- "esponse\030\001 \001(\0132\027.Hedwig.PublishResponse\"N" +
- "\n\021SubscriptionState\022#\n\005msgId\030\001 \002(\0132\024.Hed" +
- "wig.MessageSeqId\022\024\n\014messageBound\030\002 \001(\r\"r" +
- "\n\020SubscriptionData\022(\n\005state\030\001 \001(\0132\031.Hedw" +
- "ig.SubscriptionState\0224\n\013preferences\030\002 \001(" +
- "\0132\037.Hedwig.SubscriptionPreferences\"O\n\013Le",
- "dgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdI" +
- "ncluded\030\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014L" +
- "edgerRanges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.Led" +
- "gerRange\":\n\013ManagerMeta\022\023\n\013managerImpl\030\002" +
- " \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n\013HubInfoD" +
- "ata\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013" +
- "HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017Protoc" +
- "olVersion\022\017\n\013VERSION_ONE\020\001*p\n\rOperationT" +
- "ype\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSU" +
- "ME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020",
- "\004\022\021\n\rSTOP_DELIVERY\020\005*\205\004\n\nStatusCode\022\013\n\007S" +
- "UCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\rNO_S" +
- "UCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSCRIBE" +
- "D\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COUL" +
- "D_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT" +
- "_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOW" +
- "N\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026INVALID_ME" +
- "SSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_" +
- "TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSI" +
- "STENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION",
- "_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214" +
- "\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWN" +
- "ER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITIO" +
- "N\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apache.hedwi" +
- "g.protocolH\001"
+ "MessageSeqId\"I\n\021SubscribeResponse\0224\n\013pre" +
+ "ferences\030\002 \001(\0132\037.Hedwig.SubscriptionPref" +
+ "erences\"v\n\014ResponseBody\0220\n\017publishRespon" +
+ "se\030\001 \001(\0132\027.Hedwig.PublishResponse\0224\n\021sub" +
+ "scribeResponse\030\002 \001(\0132\031.Hedwig.SubscribeR" +
+ "esponse\"N\n\021SubscriptionState\022#\n\005msgId\030\001 " +
+ "\002(\0132\024.Hedwig.MessageSeqId\022\024\n\014messageBoun",
+ "d\030\002 \001(\r\"r\n\020SubscriptionData\022(\n\005state\030\001 \001" +
+ "(\0132\031.Hedwig.SubscriptionState\0224\n\013prefere" +
+ "nces\030\002 \001(\0132\037.Hedwig.SubscriptionPreferen" +
+ "ces\"O\n\013LedgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020" +
+ "endSeqIdIncluded\030\002 \001(\0132\024.Hedwig.MessageS" +
+ "eqId\"3\n\014LedgerRanges\022#\n\006ranges\030\001 \003(\0132\023.H" +
+ "edwig.LedgerRange\":\n\013ManagerMeta\022\023\n\013mana" +
+ "gerImpl\030\002 \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n" +
+ "\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030" +
+ "\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*",
+ "\"\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020\001*p\n\rO" +
+ "perationType\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001" +
+ "\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_" +
+ "DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*\205\004\n\nStatus" +
+ "Code\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221" +
+ "\003\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_" +
+ "SUBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224" +
+ "\003\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020" +
+ "\226\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SE" +
+ "RVICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026I",
+ "NVALID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020" +
+ "\210\004\022\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TO" +
+ "PIC_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUB" +
+ "SCRIPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE" +
+ "_EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027" +
+ "TOPIC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED" +
+ "_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apa" +
+ "che.hedwig.protocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14772,16 +15347,24 @@ public final class PubSubProtocol {
new java.lang.String[] { "PublishedMsgId", },
org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.class,
org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.Builder.class);
- internal_static_Hedwig_ResponseBody_descriptor =
+ internal_static_Hedwig_SubscribeResponse_descriptor =
getDescriptor().getMessageTypes().get(16);
+ internal_static_Hedwig_SubscribeResponse_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_Hedwig_SubscribeResponse_descriptor,
+ new java.lang.String[] { "Preferences", },
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.class,
+ org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder.class);
+ internal_static_Hedwig_ResponseBody_descriptor =
+ getDescriptor().getMessageTypes().get(17);
internal_static_Hedwig_ResponseBody_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_ResponseBody_descriptor,
- new java.lang.String[] { "PublishResponse", },
+ new java.lang.String[] { "PublishResponse", "SubscribeResponse", },
org.apache.hedwig.protocol.PubSubProtocol.ResponseBody.class,
org.apache.hedwig.protocol.PubSubProtocol.ResponseBody.Builder.class);
internal_static_Hedwig_SubscriptionState_descriptor =
- getDescriptor().getMessageTypes().get(17);
+ getDescriptor().getMessageTypes().get(18);
internal_static_Hedwig_SubscriptionState_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionState_descriptor,
@@ -14789,7 +15372,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState.Builder.class);
internal_static_Hedwig_SubscriptionData_descriptor =
- getDescriptor().getMessageTypes().get(18);
+ getDescriptor().getMessageTypes().get(19);
internal_static_Hedwig_SubscriptionData_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionData_descriptor,
@@ -14797,7 +15380,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData.Builder.class);
internal_static_Hedwig_LedgerRange_descriptor =
- getDescriptor().getMessageTypes().get(19);
+ getDescriptor().getMessageTypes().get(20);
internal_static_Hedwig_LedgerRange_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_LedgerRange_descriptor,
@@ -14805,7 +15388,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.class,
org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.Builder.class);
internal_static_Hedwig_LedgerRanges_descriptor =
- getDescriptor().getMessageTypes().get(20);
+ getDescriptor().getMessageTypes().get(21);
internal_static_Hedwig_LedgerRanges_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_LedgerRanges_descriptor,
@@ -14813,7 +15396,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.class,
org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.Builder.class);
internal_static_Hedwig_ManagerMeta_descriptor =
- getDescriptor().getMessageTypes().get(21);
+ getDescriptor().getMessageTypes().get(22);
internal_static_Hedwig_ManagerMeta_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_ManagerMeta_descriptor,
@@ -14821,7 +15404,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.class,
org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.Builder.class);
internal_static_Hedwig_HubInfoData_descriptor =
- getDescriptor().getMessageTypes().get(22);
+ getDescriptor().getMessageTypes().get(23);
internal_static_Hedwig_HubInfoData_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_HubInfoData_descriptor,
@@ -14829,7 +15412,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.HubInfoData.class,
org.apache.hedwig.protocol.PubSubProtocol.HubInfoData.Builder.class);
internal_static_Hedwig_HubLoadData_descriptor =
- getDescriptor().getMessageTypes().get(23);
+ getDescriptor().getMessageTypes().get(24);
internal_static_Hedwig_HubLoadData_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_HubLoadData_descriptor,
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java Mon Sep 3 15:14:13 2012
@@ -20,6 +20,7 @@ package org.apache.hedwig.protoextension
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
public class PubSubResponseUtils {
@@ -37,7 +38,13 @@ public class PubSubResponseUtils {
return getBasicBuilder(StatusCode.SUCCESS).setTxnId(txnId).build();
}
+ public static PubSubResponse getSuccessResponse(long txnId, ResponseBody respBody) {
+ return getBasicBuilder(StatusCode.SUCCESS).setTxnId(txnId)
+ .setResponseBody(respBody).build();
+ }
+
public static PubSubResponse getResponseForException(PubSubException e, long txnId) {
return getBasicBuilder(e.getCode()).setStatusMsg(e.getMessage()).setTxnId(txnId).build();
}
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Mon Sep 3 15:14:13 2012
@@ -193,8 +193,13 @@ message PublishResponse {
required MessageSeqId publishedMsgId = 1;
}
+message SubscribeResponse {
+ optional SubscriptionPreferences preferences = 2;
+}
+
message ResponseBody {
optional PublishResponse publishResponse = 1;
+ optional SubscribeResponse subscribeResponse = 2;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Mon Sep 3 15:14:13 2012
@@ -19,13 +19,13 @@ package org.apache.hedwig.server.deliver
import com.google.protobuf.ByteString;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.ServerMessageFilter;
public interface DeliveryManager {
public void start();
public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, MessageFilter filter);
+ DeliveryEndPoint endPoint, ServerMessageFilter filter);
public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Mon Sep 3 15:14:13 2012
@@ -36,7 +36,7 @@ import com.google.protobuf.ByteString;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
@@ -137,7 +137,7 @@ public class FIFODeliveryManager impleme
* subscriber
*/
public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, MessageFilter filter) {
+ DeliveryEndPoint endPoint, ServerMessageFilter filter) {
ActiveSubscriberState subscriber = new ActiveSubscriberState(topic, subscriberId, seqIdToStartFrom
.getLocalComponent() - 1, endPoint, filter);
@@ -299,13 +299,13 @@ public class FIFODeliveryManager impleme
long lastScanErrorTime = -1;
long localSeqIdDeliveringNow;
long lastSeqIdCommunicatedExternally;
- MessageFilter filter;
+ ServerMessageFilter filter;
// TODO make use of these variables
final static int SEQ_ID_SLACK = 10;
public ActiveSubscriberState(ByteString topic, ByteString subscriberId, long lastLocalSeqIdDelivered,
- DeliveryEndPoint deliveryEndPoint, MessageFilter filter) {
+ DeliveryEndPoint deliveryEndPoint, ServerMessageFilter filter) {
this.topic = topic;
this.subscriberId = subscriberId;
this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
@@ -316,6 +316,8 @@ public class FIFODeliveryManager impleme
public void setNotConnected() {
this.connected = false;
deliveryEndPoint.close();
+ // uninitialize filter
+ this.filter.uninitialize();
}
public ByteString getTopic() {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Mon Sep 3 15:14:13 2012
@@ -31,12 +31,14 @@ import org.apache.bookkeeper.util.Reflec
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.filter.MessageFilter;
import org.apache.hedwig.filter.PipelineFilter;
+import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
@@ -164,7 +166,7 @@ public class SubscribeHandler extends Ba
if (subData.hasPreferences() &&
subData.getPreferences().hasMessageFilter()) {
String messageFilterName = subData.getPreferences().getMessageFilter();
- filter.addLast(ReflectionUtils.newInstance(messageFilterName, MessageFilter.class));
+ filter.addLast(ReflectionUtils.newInstance(messageFilterName, ServerMessageFilter.class));
}
// initialize the filter
filter.initialize(cfg.getConf());
@@ -194,7 +196,14 @@ public class SubscribeHandler extends Ba
// First write success and then tell the delivery manager,
// otherwise the first message might go out before the response
// to the subscribe
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+ SubscribeResponse.Builder subRespBuilder = SubscribeResponse.newBuilder()
+ .setPreferences(subData.getPreferences());
+ ResponseBody respBody = ResponseBody.newBuilder()
+ .setSubscribeResponse(subRespBuilder).build();
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId(), respBody));
+ logger.info("Subscribe request (" + request.getTxnId() + ") for (topic:" + topic.toStringUtf8()
+ + ", subscriber:" + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
+ + " succeed - its subscription data is " + SubscriptionStateUtils.toString(subData));
subStats.updateLatency(MathUtils.now() - requestTime);
// want to start 1 ahead of the consume ptr
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java Mon Sep 3 15:14:13 2012
@@ -22,19 +22,20 @@ import java.io.IOException;
import com.google.protobuf.ByteString;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.MessageFilterBase;
+import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
-public class AllToAllTopologyFilter implements MessageFilter {
+public class AllToAllTopologyFilter implements ServerMessageFilter {
ByteString subscriberRegion;
boolean isHubSubscriber;
@Override
- public MessageFilter initialize(Configuration conf)
+ public ServerMessageFilter initialize(Configuration conf)
throws ConfigurationException, IOException {
String region = conf.getString(ServerConfiguration.REGION, "standalone");
if (null == region) {
@@ -50,8 +51,8 @@ public class AllToAllTopologyFilter impl
}
@Override
- public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences) {
+ public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences) {
isHubSubscriber = SubscriptionStateUtils.isHubSubscriber(subscriberId);
return this;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Mon Sep 3 15:14:13 2012
@@ -19,6 +19,7 @@ package org.apache.hedwig.server;
import java.net.InetAddress;
import java.io.File;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -33,6 +34,8 @@ import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -658,6 +661,78 @@ public class TestBackwardCompat extends
subscriber.closeSubscription(topic, subid);
}
+ void receiveNumModM(final ByteString topic, final ByteString subid,
+ final int start, final int num, final int M) throws Exception {
+ org.apache.hedwig.filter.ServerMessageFilter filter =
+ new org.apache.hedwig.filter.ServerMessageFilter() {
+
+ @Override
+ public org.apache.hedwig.filter.ServerMessageFilter
+ initialize(Configuration conf) {
+ // do nothing
+ return this;
+ }
+
+ @Override
+ public void uninitialize() {
+ // do nothing;
+ }
+
+ @Override
+ public org.apache.hedwig.filter.MessageFilterBase
+ setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences) {
+ // do nothing;
+ return this;
+ }
+
+ @Override
+ public boolean testMessage(org.apache.hedwig.protocol.PubSubProtocol.Message msg) {
+ int value = Integer.valueOf(msg.getBody().toStringUtf8());
+ return 0 == value % M;
+ }
+ };
+ filter.initialize(conf.getConf());
+
+ subscriber.subscribe(topic, subid, org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH);
+ final int base = start + M - start % M;
+ final AtomicInteger expected = new AtomicInteger(base);
+ final CountDownLatch latch = new CountDownLatch(1);
+ subscriber.startDeliveryWithFilter(topic, subid, new org.apache.hedwig.client.api.MessageHandler() {
+ synchronized public void deliver(ByteString topic, ByteString subscriberId,
+ org.apache.hedwig.protocol.PubSubProtocol.Message msg,
+ org.apache.hedwig.util.Callback<Void> callback, Object context) {
+ try {
+ int value = Integer.valueOf(msg.getBody().toStringUtf8());
+ // duplicated messages received, ignore them
+ if (value > start) {
+ if (value == expected.get()) {
+ expected.addAndGet(M);
+ } else {
+ logger.error("Did not receive expected value, expected {}, got {}",
+ expected.get(), value);
+ expected.set(0);
+ latch.countDown();
+ }
+ if (expected.get() == (base + num * M)) {
+ latch.countDown();
+ }
+ }
+ callback.operationFinished(context, null);
+ } catch (Exception e) {
+ logger.error("Received bad message", e);
+ latch.countDown();
+ }
+ }
+ }, (org.apache.hedwig.filter.ClientMessageFilter) filter);
+ assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(),
+ latch.await(10, TimeUnit.SECONDS));
+ assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get());
+ subscriber.stopDelivery(topic, subid);
+ filter.uninitialize();
+ subscriber.closeSubscription(topic, subid);
+ }
+
void subscribe(ByteString topic, ByteString subscriberId) throws Exception {
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
@@ -918,4 +993,40 @@ public class TestBackwardCompat extends
bkccur.stop();
}
+ /**
+ * Test compatability between version 4.1.0 and the current version.
+ *
+ * A current client running message filter would fail on 4.1.0 hub servers.
+ */
+ @Test
+ public void testClientMessageFilterCompat410() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("TestUpdateMessageBoundCompat410");
+ ByteString subid = ByteString.copyFromUtf8("mysub");
+
+ // start bookkeeper
+ BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
+ bkc410.start();
+
+ // start hub server 410
+ Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString());
+ s410.start();
+
+ ClientCurrent ccur = new ClientCurrent();
+ ccur.subscribe(topic, subid);
+ ccur.closeSubscription(topic, subid);
+
+ ccur.publishInts(topic, 0, 100);
+ try {
+ ccur.receiveNumModM(topic, subid, 0, 50, 2);
+ fail("client-side filter could not run on 4.1.0 hub server");
+ } catch (Exception e) {
+ logger.info("Should fail to run client-side message filter on 4.1.0 hub server.", e);
+ ccur.closeSubscription(topic, subid);
+ }
+
+ // stop 410 server
+ s410.stop();
+ // stop bookkeeper cluster
+ bkc410.stop();
+ }
}