You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/27 01:52:20 UTC
svn commit: r1390777 [2/4] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/data/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Wed Sep 26 23:52:18 2012
@@ -17,22 +17,12 @@
*/
package org.apache.hedwig.client.netty;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.protocol.PubSubProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.exceptions.PubSubException;
@@ -40,9 +30,8 @@ import org.apache.hedwig.exceptions.PubS
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.util.Callback;
/**
@@ -53,27 +42,19 @@ public class HedwigPublisher implements
private static Logger logger = LoggerFactory.getLogger(HedwigPublisher.class);
- // Concurrent Map to store the mappings for a given Host (Hostname:Port) to
- // the Channel that has been established for it previously. This channel
- // will be used whenever we publish on a topic that the server is the master
- // of currently. The channels used here will only be used for publish and
- // unsubscribe requests.
- protected final ConcurrentMap<InetSocketAddress, Channel> host2Channel = new ConcurrentHashMap<InetSocketAddress, Channel>();
-
- private final HedwigClientImpl client;
- private final ClientConfiguration cfg;
- private boolean closed = false;
+ private final HChannelManager channelManager;
protected HedwigPublisher(HedwigClientImpl client) {
- this.client = client;
- this.cfg = client.getConfiguration();
+ this.channelManager = client.getHChannelManager();
}
- public PubSubProtocol.PublishResponse publish(ByteString topic, Message msg)
+ public PublishResponse publish(ByteString topic, Message msg)
throws CouldNotConnectException, ServiceDownException {
- if (logger.isDebugEnabled())
- logger.debug("Calling a sync publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Calling a sync publish for topic: {}, msg: {}.",
+ topic.toStringUtf8(), msg);
+ }
PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, null, null);
synchronized (pubSubData) {
PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
@@ -105,192 +86,61 @@ public class HedwigPublisher implements
} else {
// For other types of PubSubExceptions, just throw a generic
// ServiceDownException but log a warning message.
- logger.error("Unexpected exception type when a sync publish operation failed: " + failureException);
+ logger.error("Unexpected exception type when a sync publish operation failed: ",
+ failureException);
throw new ServiceDownException("Server ack response to publish request is not successful");
}
}
- PubSubProtocol.ResponseBody respBody = pubSubCallback.getResponseBody();
- if (null == respBody) return null;
+ ResponseBody respBody = pubSubCallback.getResponseBody();
+ if (null == respBody) {
+ return null;
+ }
return respBody.hasPublishResponse() ? respBody.getPublishResponse() : null;
}
}
- public void asyncPublish(ByteString topic, Message msg, final Callback<Void> callback, Object context) {
+ public void asyncPublish(ByteString topic, Message msg,
+ final Callback<Void> callback, Object context) {
asyncPublishWithResponseImpl(topic, msg,
- new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context);
+ new VoidCallbackAdapter<ResponseBody>(callback), context);
}
public void asyncPublishWithResponse(ByteString topic, Message msg,
- Callback<PubSubProtocol.PublishResponse> _callback, Object context) {
+ Callback<PublishResponse> callback,
+ Object context) {
// adapt the callback.
- asyncPublishWithResponseImpl(topic, msg, new PublishResponseCallbackAdapter(_callback), context);
+ asyncPublishWithResponseImpl(topic, msg,
+ new PublishResponseCallbackAdapter(callback), context);
}
private void asyncPublishWithResponseImpl(ByteString topic, Message msg,
- Callback<PubSubProtocol.ResponseBody> callback, Object context) {
-
- if (logger.isDebugEnabled())
- logger.debug("Calling an async publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
- // Check if we already have a Channel connection set up to the server
- // for the given Topic.
- PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, callback, context);
- InetSocketAddress host = client.topic2Host.get(topic);
- if (host != null) {
- Channel channel = host2Channel.get(host);
- if (channel != null) {
- // We already have the Channel connection for the server host so
- // do the publish directly. We will deal with redirect logic
- // later on if that server is no longer the current host for
- // the topic.
- doPublish(pubSubData, channel);
- } else {
- // We have a mapping for the topic to host but don't have a
- // Channel for that server. This can happen if the Channel
- // is disconnected for some reason. Do the connect then to
- // the specified server host to create a new Channel connection.
- client.doConnect(pubSubData, host);
- }
- } else {
- // Server host for the given topic is not known yet so use the
- // default server host/port as defined in the configs. This should
- // point to the server VIP which would redirect to a random server
- // (which might not be the server hosting the topic).
- host = cfg.getDefaultServerHost();
- Channel channel = host2Channel.get(host);
- if (channel != null) {
- // if there is a channel to default server, use it!
- doPublish(pubSubData, channel);
- return;
- }
- client.doConnect(pubSubData, host);
- }
- }
-
- /**
- * This is a helper method to write the actual publish message once the
- * client is connected to the server and a Channel is available.
- *
- * @param pubSubData
- * Publish call's data wrapper object
- * @param channel
- * Netty I/O channel for communication between the client and
- * server
- */
- protected void doPublish(PubSubData pubSubData, Channel channel) {
- // Create a PubSubRequest
- PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
- pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
- pubsubRequestBuilder.setType(OperationType.PUBLISH);
- if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
- pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
- }
- long txnId = client.globalCounter.incrementAndGet();
- pubsubRequestBuilder.setTxnId(txnId);
- pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
- pubsubRequestBuilder.setTopic(pubSubData.topic);
-
- // Now create the PublishRequest
- PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder();
-
- publishRequestBuilder.setMsg(pubSubData.msg);
-
- // Set the PublishRequest into the outer PubSubRequest
- pubsubRequestBuilder.setPublishRequest(publishRequestBuilder);
-
- // Update the PubSubData with the txnId and the requestWriteTime
- pubSubData.txnId = txnId;
- pubSubData.requestWriteTime = MathUtils.now();
-
- // Before we do the write, store this information into the
- // ResponseHandler so when the server responds, we know what
- // appropriate Callback Data to invoke for the given txn ID.
- try {
- HedwigClientImpl.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
- } catch (NoResponseHandlerException e) {
- logger.error("No response handler found while storing the publish callback.");
- // Callback on pubsubdata indicating failure.
- pubSubData.getCallback().operationFailed(pubSubData.context, new CouldNotConnectException("No response " +
- "handler found while attempting to publish. So could not connect."));
- return;
- }
-
- // Finally, write the Publish request through the Channel.
- if (logger.isDebugEnabled())
- logger.debug("Writing a Publish request to host: " + HedwigClientImpl.getHostFromChannel(channel)
- + " for pubSubData: " + pubSubData);
- ChannelFuture future = channel.write(pubsubRequestBuilder.build());
- future.addListener(new WriteCallback(pubSubData, client));
- }
-
- // Synchronized method to store the host2Channel mapping (if it doesn't
- // exist yet). Retrieve the hostname info from the Channel created via the
- // RemoteAddress tied to it.
- protected synchronized void storeHost2ChannelMapping(Channel channel) {
- InetSocketAddress host = HedwigClientImpl.getHostFromChannel(channel);
- if (!closed && host2Channel.putIfAbsent(host, channel) == null) {
- if (logger.isDebugEnabled())
- logger.debug("Stored a new Channel mapping for host: " + host);
- } else {
- // If we've reached here, that means we already have a Channel
- // mapping for the given host. This should ideally not happen
- // and it means we are creating another Channel to a server host
- // to publish on when we could have used an existing one. This could
- // happen due to a race condition if initially multiple concurrent
- // threads are publishing on the same topic and no Channel exists
- // currently to the server. We are not synchronizing this initial
- // creation of Channels to a given host for performance.
- // Another possible way to have redundant Channels created is if
- // a new topic is being published to, we connect to the default
- // server host which should be a VIP that redirects to a "real"
- // server host. Since we don't know beforehand what is the full
- // set of server hosts, we could be redirected to a server that
- // we already have a channel connection to from a prior existing
- // topic. Close these redundant channels as they won't be used.
- logger.debug("Channel mapping to host: {} already exists so no need to store it.", host);
- try {
- HedwigClientImpl.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
- } catch (NoResponseHandlerException e) {
- logger.error("Could not get response handler while closing channel.");
- }
- channel.close();
- }
- }
-
- // Public getter for entries in the host2Channel Map.
- // This is used for classes that need this information but are not in the
- // same classpath.
- public Channel getChannelForHost(InetSocketAddress host) {
- return host2Channel.get(host);
- }
-
- void close() {
- synchronized(this) {
- closed = true;
+ Callback<ResponseBody> callback,
+ Object context) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Calling an async publish for topic: {}, msg: {}.",
+ topic.toStringUtf8(), msg);
}
- for (Channel channel : host2Channel.values()) {
- try {
- client.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
- } catch (NoResponseHandlerException e) {
- logger.error("No response handler while trying explicitly close Publisher channel " + channel);
- }
- channel.close().awaitUninterruptibly();
- }
- host2Channel.clear();
+ PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null,
+ callback, context);
+ channelManager.submitOp(pubSubData);
}
- private static class PublishResponseCallbackAdapter implements Callback<PubSubProtocol.ResponseBody>{
+ private static class PublishResponseCallbackAdapter implements Callback<ResponseBody>{
- private final Callback<PubSubProtocol.PublishResponse> delegate;
+ private final Callback<PublishResponse> delegate;
- private PublishResponseCallbackAdapter(Callback<PubSubProtocol.PublishResponse> delegate) {
+ private PublishResponseCallbackAdapter(Callback<PublishResponse> delegate) {
this.delegate = delegate;
}
@Override
- public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
- if (null == resultOfOperation) delegate.operationFinished(ctx, null);
- else delegate.operationFinished(ctx, resultOfOperation.getPublishResponse());
+ public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
+ if (null == resultOfOperation) {
+ delegate.operationFinished(ctx, null);
+ } else {
+ delegate.operationFinished(ctx, resultOfOperation.getPublishResponse());
+ }
}
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1390777&r1=1390776&r2=1390777&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Wed Sep 26 23:52:18 2012
@@ -17,25 +17,13 @@
*/
package org.apache.hedwig.client.netty;
-import java.net.InetSocketAddress;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.protocol.PubSubProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.conf.ClientConfiguration;
@@ -44,22 +32,17 @@ import org.apache.hedwig.client.data.Top
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hedwig.client.handlers.PubSubCallback;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.filter.ClientMessageFilter;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
@@ -73,46 +56,22 @@ public class HedwigSubscriber implements
private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
- // Concurrent Map to store the cached Channel connections on the client side
- // to a server host for a given Topic + SubscriberId combination. For each
- // TopicSubscriber, we want a unique Channel connection to the server for
- // it. We can also get the ResponseHandler tied to the Channel via the
- // Channel Pipeline.
- protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
- protected final ConcurrentMap<TopicSubscriber, SubscriptionPreferences> topicSubscriber2Preferences =
- new ConcurrentHashMap<TopicSubscriber, SubscriptionPreferences>();
-
- // Concurrent Map to store Message handler for each topic + sub id combination.
- // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
- // user set when connection is recovered
- protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler= new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
-
- protected final CopyOnWriteArraySet<SubscriptionListener> listeners;
-
- protected final HedwigClientImpl client;
protected final ClientConfiguration cfg;
- private final Object closeLock = new Object();
- private boolean closed = false;
+ protected final HChannelManager channelManager;
public HedwigSubscriber(HedwigClientImpl client) {
- this.client = client;
this.cfg = client.getConfiguration();
- this.listeners = new CopyOnWriteArraySet<SubscriptionListener>();
+ this.channelManager = client.getHChannelManager();
}
public void addSubscriptionListener(SubscriptionListener listener) {
- listeners.add(listener);
+ channelManager.getSubscriptionEventEmitter()
+ .addSubscriptionListener(listener);
}
public void removeSubscriptionListener(SubscriptionListener listener) {
- listeners.remove(listener);
- }
-
- void emitSubscriptionEvent(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event) {
- for (SubscriptionListener listener : listeners) {
- listener.processEvent(topic, subscriberId, event);
- }
+ channelManager.getSubscriptionEventEmitter()
+ .removeSubscriptionListener(listener);
}
// Private method that holds the common logic for doing synchronous
@@ -167,7 +126,7 @@ public class HedwigSubscriber implements
else if (failureException instanceof ServiceDownException)
throw (ServiceDownException) failureException;
else {
- logger.error("Unexpected PubSubException thrown: " + failureException.toString());
+ logger.error("Unexpected PubSubException thrown: ", failureException);
// Throw a generic ServiceDownException but wrap the
// original PubSubException within it.
throw new ServiceDownException(failureException);
@@ -181,7 +140,7 @@ public class HedwigSubscriber implements
// flows are very similar. The assumption is that the input OperationType is
// either SUBSCRIBE or UNSUBSCRIBE.
private void asyncSubUnsub(ByteString topic, ByteString subscriberId,
- Callback<PubSubProtocol.ResponseBody> callback, Object context,
+ Callback<ResponseBody> callback, Object context,
OperationType operationType, SubscriptionOptions options) {
if (logger.isDebugEnabled()) {
StringBuilder debugMsg = new StringBuilder().append("Calling a async subUnsub request for topic: ")
@@ -194,41 +153,18 @@ public class HedwigSubscriber implements
}
logger.debug(debugMsg.toString());
}
- // Check if we know which server host is the master for the topic we are
- // subscribing to.
- PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, callback,
- context);
-
- InetSocketAddress host = client.topic2Host.get(topic);
- if (host != null) {
- Channel existingChannel = null;
- if (operationType.equals(OperationType.UNSUBSCRIBE) &&
- (existingChannel = client.getPublisher().host2Channel.get(host)) != null) {
- // For unsubscribes, we can reuse the channel connections to the
- // server host that are cached for publishes. For publish and
- // unsubscribe flows, we will thus use the same Channels and
- // will cache and store them during the ConnectCallback.
- doSubUnsub(pubSubData, existingChannel);
- } else {
- // We know which server host is the master for the topic so
- // connect to that first. For subscribes, we want a new channel
- // connection each time for the TopicSubscriber. If the
- // TopicSubscriber is already connected and subscribed,
- // we assume the server will respond with an appropriate status
- // indicating this. For unsubscribes, it is possible that the
- // client is subscribed to the topic already but does not
- // have a Channel connection yet to the server host. e.g. Client
- // goes down and comes back up but client side soft state memory
- // does not have the netty Channel connection anymore.
- client.doConnect(pubSubData, host);
+ if (OperationType.SUBSCRIBE.equals(operationType)) {
+ if (options.getMessageBound() <= 0 &&
+ cfg.getSubscriptionMessageBound() > 0) {
+ SubscriptionOptions.Builder soBuilder =
+ SubscriptionOptions.newBuilder(options).setMessageBound(
+ cfg.getSubscriptionMessageBound());
+ options = soBuilder.build();
}
- } else {
- // Server host for the given topic is not known yet so use the
- // default server host/port as defined in the configs. This should
- // point to the server VIP which would redirect to a random server
- // (which might not be the server hosting the topic).
- client.doConnect(pubSubData, cfg.getDefaultServerHost());
}
+ PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType,
+ options, callback, context);
+ channelManager.submitOp(pubSubData);
}
public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
@@ -256,7 +192,7 @@ public class HedwigSubscriber implements
try {
subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, options);
} catch (ClientNotSubscribedException e) {
- logger.error("Unexpected Exception thrown: " + e.toString());
+ logger.error("Unexpected Exception thrown: ", e);
// This exception should never be thrown here. But just in case,
// throw a generic ServiceDownException but wrap the original
// Exception within it.
@@ -285,8 +221,9 @@ public class HedwigSubscriber implements
"SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
return;
}
- asyncSubUnsub(topic, subscriberId, new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback),
- context, OperationType.SUBSCRIBE, options);
+ asyncSubUnsub(topic, subscriberId,
+ new VoidCallbackAdapter<ResponseBody>(callback), context,
+ OperationType.SUBSCRIBE, options);
}
public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
@@ -312,7 +249,7 @@ public class HedwigSubscriber implements
try {
subUnsub(topic, subscriberId, OperationType.UNSUBSCRIBE, null);
} catch (ClientAlreadySubscribedException e) {
- logger.error("Unexpected Exception thrown: " + e.toString());
+ logger.error("Unexpected Exception thrown: ", e);
// This exception should never be thrown here. But just in case,
// throw a generic ServiceDownException but wrap the original
// Exception within it.
@@ -320,20 +257,22 @@ public class HedwigSubscriber implements
}
}
- public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
- final Object context) {
+ public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
+ final Callback<Void> callback, final Object context) {
doAsyncUnsubscribe(topic, subscriberId,
- new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context, false);
+ new VoidCallbackAdapter<ResponseBody>(callback),
+ context, false);
}
protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
final Callback<Void> callback, final Object context, boolean isHub) {
doAsyncUnsubscribe(topic, subscriberId,
- new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context, isHub);
+ new VoidCallbackAdapter<ResponseBody>(callback),
+ context, isHub);
}
private void doAsyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
- final Callback<PubSubProtocol.ResponseBody> callback,
+ final Callback<ResponseBody> callback,
final Object context, boolean isHub) {
// Validate that the format of the subscriberId is valid either as a
// local or hub subscriber.
@@ -344,9 +283,9 @@ public class HedwigSubscriber implements
}
// Asynchronously close the subscription. On the callback to that
// operation once it completes, post the async unsubscribe request.
- doAsyncCloseSubscription(topic, subscriberId, new Callback<PubSubProtocol.ResponseBody>() {
+ doAsyncCloseSubscription(topic, subscriberId, new Callback<ResponseBody>() {
@Override
- public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
+ public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
asyncSubUnsub(topic, subscriberId, callback, context, OperationType.UNSUBSCRIBE, null);
}
@@ -369,184 +308,22 @@ public class HedwigSubscriber implements
public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
throws ClientNotSubscribedException {
- if (logger.isDebugEnabled())
- logger.debug("Calling consume for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+ logger.debug("Calling consume for {}, messageSeqId: {}.",
+ topicSubscriber, messageSeqId);
+
+ SubscribeResponseHandler subscribeResponseHandler =
+ channelManager.getSubscribeResponseHandler(topicSubscriber);
// Check that this topic subscription on the client side exists.
- Channel channel = topicSubscriber2Channel.get(topicSubscriber);
- if (channel == null) {
+ if (null == subscribeResponseHandler ||
+ !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
throw new ClientNotSubscribedException(
- "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8());
+ "Cannot send consume message since client is not subscribed to topic: "
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
}
- PubSubData pubSubData = new PubSubData(topic, null, subscriberId, OperationType.CONSUME, null, null, null);
// Send the consume message to the server using the same subscribe
// channel that the topic subscription uses.
- doConsume(pubSubData, channel, messageSeqId);
- }
-
- /**
- * Convert client-side subscription options to subscription preferences
- *
- * @param options
- * Client-Side subscription options
- */
- protected SubscriptionPreferences.Builder options2Preferences(SubscriptionOptions options) {
- // prepare subscription preferences
- SubscriptionPreferences.Builder preferencesBuilder = SubscriptionPreferences.newBuilder();
-
- // set message bound
- if (options.getMessageBound() > 0) {
- preferencesBuilder.setMessageBound(options.getMessageBound());
- } else if (cfg.getSubscriptionMessageBound() > 0) {
- preferencesBuilder.setMessageBound(cfg.getSubscriptionMessageBound());
- }
-
- // set message filter
- if (options.hasMessageFilter()) {
- preferencesBuilder.setMessageFilter(options.getMessageFilter());
- }
-
- // set user options
- if (options.hasOptions()) {
- preferencesBuilder.setOptions(options.getOptions());
- }
-
- return preferencesBuilder;
- }
-
- /**
- * This is a helper method to write the actual subscribe/unsubscribe message
- * once the client is connected to the server and a Channel is available.
- *
- * @param pubSubData
- * Subscribe/Unsubscribe call's data wrapper object. We assume
- * that the operationType field is either SUBSCRIBE or
- * UNSUBSCRIBE.
- * @param channel
- * Netty I/O channel for communication between the client and
- * server
- */
- protected void doSubUnsub(PubSubData pubSubData, Channel channel) {
- // Create a PubSubRequest
- PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
- pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
- pubsubRequestBuilder.setType(pubSubData.operationType);
- if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
- pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
- }
- long txnId = client.globalCounter.incrementAndGet();
- pubsubRequestBuilder.setTxnId(txnId);
- pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
- pubsubRequestBuilder.setTopic(pubSubData.topic);
-
- // Create either the Subscribe or Unsubscribe Request
- if (pubSubData.operationType.equals(OperationType.SUBSCRIBE)) {
- // Create the SubscribeRequest
- SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
- subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
- subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
- subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach());
- // For now, all subscribes should wait for all cross-regional
- // subscriptions to be established before returning.
- subscribeRequestBuilder.setSynchronous(true);
- // set subscription preferences
- SubscriptionPreferences.Builder preferencesBuilder =
- options2Preferences(pubSubData.options);
- // backward compatable with 4.1.0
- if (preferencesBuilder.hasMessageBound()) {
- subscribeRequestBuilder.setMessageBound(preferencesBuilder.getMessageBound());
- }
- subscribeRequestBuilder.setPreferences(preferencesBuilder);
-
- // Set the SubscribeRequest into the outer PubSubRequest
- pubsubRequestBuilder.setSubscribeRequest(subscribeRequestBuilder);
- } else {
- // Create the UnSubscribeRequest
- UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder();
- unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
-
- // Set the UnsubscribeRequest into the outer PubSubRequest
- pubsubRequestBuilder.setUnsubscribeRequest(unsubscribeRequestBuilder);
- }
-
- // Update the PubSubData with the txnId and the requestWriteTime
- pubSubData.txnId = txnId;
- pubSubData.requestWriteTime = MathUtils.now();
-
- // Before we do the write, store this information into the
- // ResponseHandler so when the server responds, we know what
- // appropriate Callback Data to invoke for the given txn ID.
- try {
- HedwigClientImpl.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
- } catch (Exception e) {
- logger.error("No response handler found while storing the subscribe callback.");
- // Call operationFailed on the pubsubdata callback to indicate failure
- pubSubData.getCallback().operationFailed(pubSubData.context, new CouldNotConnectException("No response " +
- "handler found while attempting to subscribe."));
- return;
- }
-
- // Finally, write the Subscribe request through the Channel.
- if (logger.isDebugEnabled())
- logger.debug("Writing a SubUnsub request to host: " + HedwigClientImpl.getHostFromChannel(channel)
- + " for pubSubData: " + pubSubData);
- ChannelFuture future = channel.write(pubsubRequestBuilder.build());
- future.addListener(new WriteCallback(pubSubData, client));
- }
-
- /**
- * This is a helper method to write a consume message to the server after a
- * subscribe Channel connection is made to the server and messages are being
- * consumed by the client.
- *
- * @param pubSubData
- * Consume call's data wrapper object. We assume that the
- * operationType field is CONSUME.
- * @param channel
- * Netty I/O channel for communication between the client and
- * server
- * @param messageSeqId
- * Message Seq ID for the latest/last message the client has
- * consumed.
- */
- public void doConsume(final PubSubData pubSubData, final Channel channel, final MessageSeqId messageSeqId) {
- // Create a PubSubRequest
- PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
- pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
- pubsubRequestBuilder.setType(OperationType.CONSUME);
- long txnId = client.globalCounter.incrementAndGet();
- pubsubRequestBuilder.setTxnId(txnId);
- pubsubRequestBuilder.setTopic(pubSubData.topic);
-
- // Create the ConsumeRequest
- ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder();
- consumeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
- consumeRequestBuilder.setMsgId(messageSeqId);
-
- // Set the ConsumeRequest into the outer PubSubRequest
- pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
-
- // For Consume requests, we will send them from the client in a fire and
- // forget manner. We are not expecting the server to send back an ack
- // response so no need to register this in the ResponseHandler. There
- // are no callbacks to invoke since this isn't a client initiated
- // action. Instead, just have a future listener that will log an error
- // message if there was a problem writing the consume request.
- if (logger.isDebugEnabled())
- logger.debug("Writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel)
- + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
- ChannelFuture future = channel.write(pubsubRequestBuilder.build());
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Error writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel)
- + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
- }
- }
- });
+ subscribeResponseHandler.consume(topicSubscriber, messageSeqId);
}
public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
@@ -559,7 +336,11 @@ public class HedwigSubscriber implements
// correct way to contact the server to get this info is then.
// The client side just has soft memory state for client subscription
// information.
- return topicSubscriber2Channel.containsKey(new TopicSubscriber(topic, subscriberId));
+ TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+ SubscribeResponseHandler subscribeResponseHandler =
+ channelManager.getSubscribeResponseHandler(topicSubscriber);
+ return !(null == subscribeResponseHandler ||
+ !subscribeResponseHandler.hasSubscription(topicSubscriber));
}
public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
@@ -569,9 +350,12 @@ public class HedwigSubscriber implements
return null;
}
- public void startDelivery(final ByteString topic, final ByteString subscriberId, MessageHandler messageHandler)
+ public void startDelivery(final ByteString topic, final ByteString subscriberId,
+ MessageHandler messageHandler)
throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- startDelivery(topic, subscriberId, messageHandler, false);
+ TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
+ logger.debug("Starting delivery for {}.", topicSubscriber);
+ channelManager.startDelivery(topicSubscriber, messageHandler);
}
public void startDeliveryWithFilter(final ByteString topic, final ByteString subscriberId,
@@ -579,147 +363,19 @@ public class HedwigSubscriber implements
ClientMessageFilter messageFilter)
throws ClientNotSubscribedException, AlreadyStartDeliveryException {
if (null == messageHandler || null == messageFilter) {
- throw new NullPointerException("Null message handler or message filter is provided.");
+ throw new NullPointerException("Null message handler or message filter is provided.");
}
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- SubscriptionPreferences preferences = topicSubscriber2Preferences.get(topicSubscriber);
- if (null == preferences) {
- throw new ClientNotSubscribedException("No subscription preferences found to filter messages for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
- }
- // pass subscription preferences to message filter
- if (logger.isDebugEnabled()) {
- logger.debug("Start delivering messages with filter on topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8() + ", preferences: "
- + SubscriptionStateUtils.toString(preferences));
- }
- messageFilter.setSubscriptionPreferences(topic, subscriberId, preferences);
messageHandler = new FilterableMessageHandler(messageHandler, messageFilter);
- startDelivery(topic, subscriberId, messageHandler, false);
- }
-
- public void restartDelivery(final ByteString topic, final ByteString subscriberId)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- startDelivery(topic, subscriberId, null, true);
+ logger.debug("Starting delivery with filter for {}.", topicSubscriber);
+ channelManager.startDelivery(topicSubscriber, messageHandler);
}
- private void startDelivery(final ByteString topic, final ByteString subscriberId,
- MessageHandler messageHandler, boolean restart)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- if (logger.isDebugEnabled())
- logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
+ public void stopDelivery(final ByteString topic, final ByteString subscriberId)
+ throws ClientNotSubscribedException {
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- // Make sure we know about this topic subscription on the client side
- // exists. The assumption is that the client should have in memory the
- // Channel created for the TopicSubscriber once the server has sent
- // an ack response to the initial subscribe request.
- Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
- if (topicSubscriberChannel == null) {
- logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
- throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8());
- }
-
- // Need to ensure the setting of handler and the readability of channel is in sync
- // as there's a race condition that connection recovery and user might call this at the same time
- MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
- if (restart) {
- // restart using existing msg handler
- messageHandler = existedMsgHandler;
- } else {
- // some has started delivery but not stop it
- if (null != existedMsgHandler) {
- throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
- }
- if (messageHandler != null) {
- if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
- throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
- }
- }
- }
- try {
- HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
- .setMessageHandler(messageHandler);
- } catch (NoResponseHandlerException e) {
- // We did not find a response handler. So remove this subscription handler and throw an exception.
- topicSubscriber2MessageHandler.remove(topicSubscriber, existedMsgHandler);
- asyncCloseSubscription(topic, subscriberId, new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- logger.warn("Closed subscription for topic " + topic.toStringUtf8() + " and subscriber " +
- subscriberId.toStringUtf8());
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.warn("Error while closing subscription for topic " + topic.toStringUtf8() + " and subscriber " +
- subscriberId.toStringUtf8());
- }
- }, null);
-
- // We should tell the client to resubscribe.
- throw new ClientNotSubscribedException("Closed subscription for topic " + topic.toStringUtf8() + " and" +
- "subscriber Id " + subscriberId.toStringUtf8());
- }
- // Now make the TopicSubscriber Channel readable (it is set to not be
- // readable when the initial subscription is done). Note that this is an
- // asynchronous call. If this fails (not likely), the futureListener
- // will just log an error message for now.
- ChannelFuture future = topicSubscriberChannel.setReadable(true);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Unable to make subscriber Channel readable in startDelivery call for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
- }
- }
- });
- }
-
- public void stopDelivery(final ByteString topic, final ByteString subscriberId) throws ClientNotSubscribedException {
- if (logger.isDebugEnabled())
- logger.debug("Stopping delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
- TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- // Make sure we know that this topic subscription on the client side
- // exists. The assumption is that the client should have in memory the
- // Channel created for the TopicSubscriber once the server has sent
- // an ack response to the initial subscribe request.
- Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
- if (topicSubscriberChannel == null) {
- logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
- throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8());
- }
-
- // Unregister the MessageHandler for the subscribe Channel's
- // Response Handler.
- try {
- HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
- .setMessageHandler(null);
- } catch (NoResponseHandlerException e) {
- // Here it's okay if we can't set the response handler's message handler to null. We should just remove it.
- logger.warn("Could not set message handler to null for subscription channel " + topicSubscriberChannel + ", ignoring.");
- }
- this.topicSubscriber2MessageHandler.remove(topicSubscriber);
- // Now make the TopicSubscriber channel not-readable. This will buffer
- // up messages if any are sent from the server. Note that this is an
- // asynchronous call. If this fails (not likely), the futureListener
- // will just log an error message for now.
- ChannelFuture future = topicSubscriberChannel.setReadable(false);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Unable to make subscriber Channel not readable in stopDelivery call for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
- }
- }
- });
+ logger.debug("Stopping delivery for {}.", topicSubscriber);
+ channelManager.stopDelivery(topicSubscriber);
}
public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException {
@@ -744,94 +400,22 @@ public class HedwigSubscriber implements
public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
final Callback<Void> callback, final Object context) {
doAsyncCloseSubscription(topic, subscriberId,
- new VoidCallbackAdapter<PubSubProtocol.ResponseBody> (callback), context);
+ new VoidCallbackAdapter<ResponseBody>(callback), context);
}
private void doAsyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
- final Callback<PubSubProtocol.ResponseBody> callback, final Object context) {
- if (logger.isDebugEnabled())
- logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
+ final Callback<ResponseBody> callback, final Object context) {
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- // Remove all cached references for the TopicSubscriber
- Channel channel = topicSubscriber2Channel.remove(topicSubscriber);
- if (channel != null) {
- // Close the subscribe channel asynchronously.
- try {
- HedwigClientImpl.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
- } catch (NoResponseHandlerException e) {
- // Don't close the channel if you can't find the handler.
- logger.warn("No response handler found, so could not close subscription channel " + channel);
- }
- // We still close the channel as this is an unexpected event and should be handled as one.
- ChannelFuture future = channel.close();
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Failed to close the subscription channel for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8());
- callback.operationFailed(context, new ServiceDownException(
- "Failed to close the subscription channel for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8()));
- } else {
- callback.operationFinished(context, null);
- }
- }
- });
- } else {
- logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
- callback.operationFinished(context, null);
- }
- }
-
- // Public getter and setters for entries in the topic2Host Map.
- // This is used for classes that need this information but are not in the
- // same classpath.
- public Channel getChannelForTopic(TopicSubscriber topic) {
- return topicSubscriber2Channel.get(topic);
- }
-
- public void setChannelAndPreferencesForTopic(TopicSubscriber topic, Channel channel,
- SubscriptionPreferences preferences) {
- synchronized (closeLock) {
- if (closed) {
- channel.close();
- return;
- }
- Channel oldc = topicSubscriber2Channel.putIfAbsent(topic, channel);
- if (oldc != null) {
- logger.warn("Dropping new channel for " + topic + ", due to existing channel: " + oldc);
- channel.close();
- }
- if (null != preferences) {
- topicSubscriber2Preferences.put(topic, preferences);
- }
- }
- }
-
- public void removeTopicSubscriber(TopicSubscriber topic) {
- synchronized (topic) {
- topicSubscriber2Preferences.remove(topic);
- topicSubscriber2Channel.remove(topic);
- }
- }
-
- void close() {
- synchronized (closeLock) {
- closed = true;
- }
-
- // Close all of the open Channels.
- for (Channel channel : topicSubscriber2Channel.values()) {
- try {
- client.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
- } catch (NoResponseHandlerException e) {
- logger.error("No response handler found while trying to close subscription channel.");
- }
- channel.close().awaitUninterruptibly();
+ logger.debug("Stopping delivery for {} before closing subscription.", topicSubscriber);
+ // We only stop delivery here not in channel manager
+ // Because channelManager#asyncCloseSubscription will called
+ // when subscription channel disconnected to clear local subscription
+ try {
+ channelManager.stopDelivery(topicSubscriber);
+ } catch (ClientNotSubscribedException cnse) {
+ // it is OK to ignore the exception when closing subscription
}
- topicSubscriber2Channel.clear();
+ logger.debug("Closing subscription asynchronously for {}.", topicSubscriber);
+ channelManager.asyncCloseSubscription(topicSubscriber, callback, context);
}
}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+
+/**
+ * Utilities for network operations.
+ */
+public class NetUtils {
+
+ /**
+ * Helper static method to get the String Hostname:Port from a netty
+ * Channel. Assumption is that the netty Channel was originally created with
+ * an InetSocketAddress. This is true with the Hedwig netty implementation.
+ *
+ * @param channel
+ * Netty channel to extract the hostname and port from.
+ * @return String representation of the Hostname:Port from the Netty Channel
+ */
+ public static InetSocketAddress getHostFromChannel(Channel channel) {
+ return (InetSocketAddress) channel.getRemoteAddress();
+ }
+
+ /**
+ * This is a helper method to build the actual pub/sub message.
+ *
+ * @param txnId
+ * Transaction Id.
+ * @param pubSubData
+ * Publish call's data wrapper object.
+ * @return pub sub request to send
+ */
+ public static PubSubRequest.Builder buildPubSubRequest(long txnId,
+ PubSubData pubSubData) {
+ // Create a PubSubRequest
+ PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+ pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+ pubsubRequestBuilder.setType(pubSubData.operationType);
+ // for consume request, we don't need to care about tried servers list
+ if (OperationType.CONSUME != pubSubData.operationType) {
+ if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
+ pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
+ }
+ }
+ pubsubRequestBuilder.setTxnId(txnId);
+ pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
+ pubsubRequestBuilder.setTopic(pubSubData.topic);
+
+ switch (pubSubData.operationType) {
+ case PUBLISH:
+ // Set the PublishRequest into the outer PubSubRequest
+ pubsubRequestBuilder.setPublishRequest(buildPublishRequest(pubSubData));
+ break;
+ case SUBSCRIBE:
+ // Set the SubscribeRequest into the outer PubSubRequest
+ pubsubRequestBuilder.setSubscribeRequest(buildSubscribeRequest(pubSubData));
+ break;
+ case UNSUBSCRIBE:
+ // Set the UnsubscribeRequest into the outer PubSubRequest
+ pubsubRequestBuilder.setUnsubscribeRequest(buildUnsubscribeRequest(pubSubData));
+ break;
+ }
+
+ // Update the PubSubData with the txnId and the requestWriteTime
+ pubSubData.txnId = txnId;
+ pubSubData.requestWriteTime = System.currentTimeMillis();
+
+ return pubsubRequestBuilder;
+ }
+
+ // build publish request
+ private static PublishRequest.Builder buildPublishRequest(PubSubData pubSubData) {
+ PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder();
+ publishRequestBuilder.setMsg(pubSubData.msg);
+ return publishRequestBuilder;
+ }
+
+ // build subscribe request
+ private static SubscribeRequest.Builder buildSubscribeRequest(PubSubData pubSubData) { SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
+ subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+ subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
+ subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach());
+ // For now, all subscribes should wait for all cross-regional
+ // subscriptions to be established before returning.
+ subscribeRequestBuilder.setSynchronous(true);
+ // set subscription preferences
+ SubscriptionPreferences.Builder preferencesBuilder =
+ options2Preferences(pubSubData.options);
+ // backward compatable with 4.1.0
+ if (preferencesBuilder.hasMessageBound()) {
+ subscribeRequestBuilder.setMessageBound(preferencesBuilder.getMessageBound());
+ }
+ subscribeRequestBuilder.setPreferences(preferencesBuilder);
+ return subscribeRequestBuilder;
+ }
+
+ // build unsubscribe request
+ private static UnsubscribeRequest.Builder buildUnsubscribeRequest(PubSubData pubSubData) {
+ // Create the UnSubscribeRequest
+ UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder();
+ unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+ return unsubscribeRequestBuilder;
+ }
+
+ /**
+ * Build consume request
+ *
+ * @param txnId
+ * Transaction Id.
+ * @param topicSubscriber
+ * Topic Subscriber.
+ * @param messageSeqId
+ * Message Seq Id.
+ * @return pub/sub request.
+ */
+ public static PubSubRequest.Builder buildConsumeRequest(long txnId,
+ TopicSubscriber topicSubscriber,
+ MessageSeqId messageSeqId) {
+ // Create a PubSubRequest
+ PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
+ pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
+ pubsubRequestBuilder.setType(OperationType.CONSUME);
+
+ pubsubRequestBuilder.setTxnId(txnId);
+ pubsubRequestBuilder.setTopic(topicSubscriber.getTopic());
+
+ // Create the ConsumeRequest
+ ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder();
+ consumeRequestBuilder.setSubscriberId(topicSubscriber.getSubscriberId());
+ consumeRequestBuilder.setMsgId(messageSeqId);
+
+ pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
+
+ return pubsubRequestBuilder;
+ }
+
+ /**
+ * Convert client-side subscription options to subscription preferences
+ *
+ * @param options
+ * Client-Side subscription options
+ * @return subscription preferences
+ */
+ private static SubscriptionPreferences.Builder options2Preferences(SubscriptionOptions options) {
+ // prepare subscription preferences
+ SubscriptionPreferences.Builder preferencesBuilder =
+ SubscriptionPreferences.newBuilder();
+
+ // set message bound
+ if (options.getMessageBound() > 0) {
+ preferencesBuilder.setMessageBound(options.getMessageBound());
+ }
+
+ // set message filter
+ if (options.hasMessageFilter()) {
+ preferencesBuilder.setMessageFilter(options.getMessageFilter());
+ }
+
+ // set user options
+ if (options.hasOptions()) {
+ preferencesBuilder.setOptions(options.getOptions());
+ }
+
+ return preferencesBuilder;
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java?rev=1390777&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java Wed Sep 26 23:52:18 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.util.SubscriptionListener;
+
+public class SubscriptionEventEmitter {
+
+ private final CopyOnWriteArraySet<SubscriptionListener> listeners;
+
+ public SubscriptionEventEmitter() {
+ listeners = new CopyOnWriteArraySet<SubscriptionListener>();
+ }
+
+ public void addSubscriptionListener(SubscriptionListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeSubscriptionListener(SubscriptionListener listener) {
+ listeners.remove(listener);
+ }
+
+ public void emitSubscriptionEvent(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event) {
+ for (SubscriptionListener listener : listeners) {
+ listener.processEvent(topic, subscriberId, event);
+ }
+ }
+
+}