You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/11/02 17:15:01 UTC
svn commit: r1405028 [1/3] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/conf/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig...
Author: ivank
Date: Fri Nov 2 16:14:59 2012
New Revision: 1405028
URL: http://svn.apache.org/viewvc?rev=1405028&view=rev
Log:
BOOKKEEPER-368 Implementing multiplexing java client. (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
Removed:
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Nov 2 16:14:59 2012
@@ -224,6 +224,8 @@ Trunk (unreleased changes)
BOOKKEEPER-369: re-factor hedwig cpp client to provide better interface to support both one-subscription-per-channel and multiple-subscriptions-per-channel. (sijie via ivank)
+ BOOKKEEPER-368: Implementing multiplexing java client. (sijie via ivank)
+
Release 4.1.0 - 2012-06-07
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java Fri Nov 2 16:14:59 2012
@@ -42,6 +42,7 @@ public class ClientConfiguration extends
protected static final String TIMEOUT_THREAD_RUN_INTERVAL = "timeout_thread_run_interval";
protected static final String SSL_ENABLED = "ssl_enabled";
protected static final String SUBSCRIPTION_MESSAGE_BOUND = "subscription_message_bound";
+ protected static final String MULTIPLEXING_ENABLED = "multiplexing_enabled";
// Singletons we want to instantiate only once per ClientConfiguration
protected HedwigSocketAddress myDefaultServerAddress = null;
@@ -140,6 +141,14 @@ public class ClientConfiguration extends
}
/**
+ * This parameter is a boolean flag indicating if multiplexing subscription
+ * channels.
+ */
+ public boolean isMultiplexingEnabled() {
+ return conf.getBoolean(MULTIPLEXING_ENABLED, false);
+ }
+
+ /**
* The maximum number of messages the hub will queue for subscriptions
* created using this configuration. The hub will always queue the most
* recent messages. If there are enough publishes to the topic to hit
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Fri Nov 2 16:14:59 2012
@@ -23,6 +23,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
+import com.google.protobuf.ByteString;
+
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.TopicSubscriber;
@@ -34,6 +36,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
@@ -61,6 +64,20 @@ public abstract class SubscribeResponseH
public abstract void handleSubscribeMessage(PubSubResponse response);
/**
+ * Handle a subscription event delivered by the server.
+ *
+ * @param topic
+ * Topic Name
+ * @param subscriberId
+ * Subscriber Id
+ * @param event
+ * Subscription Event describes its status
+ */
+ public abstract void handleSubscriptionEvent(ByteString topic,
+ ByteString subscriberId,
+ SubscriptionEvent event);
+
+ /**
* Method called when a message arrives for a subscribe Channel and we want
* to deliver it asynchronously via the registered MessageHandler (should
* not be null when called here).
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Fri Nov 2 16:14:59 2012
@@ -29,6 +29,7 @@ import com.google.protobuf.ByteString;
import org.apache.hedwig.client.api.Client;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager;
+import org.apache.hedwig.client.netty.impl.multiplex.MultiplexHChannelManager;
/**
* This is a top level Hedwig Client class that encapsulates the common
@@ -73,8 +74,11 @@ public class HedwigClientImpl implements
protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) {
this.cfg = cfg;
this.socketFactory = socketFactory;
- channelManager = new SimpleHChannelManager(cfg, socketFactory);
-
+ if (cfg.isMultiplexingEnabled()) {
+ channelManager = new MultiplexHChannelManager(cfg, socketFactory);
+ } else {
+ channelManager = new SimpleHChannelManager(cfg, socketFactory);
+ }
pub = new HedwigPublisher(this);
sub = new HedwigSubscriber(this);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java Fri Nov 2 16:14:59 2012
@@ -43,7 +43,9 @@ import org.apache.hedwig.exceptions.PubS
import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse;
import static org.apache.hedwig.util.VarArgs.va;
@ChannelPipelineCoverage("all")
@@ -119,6 +121,27 @@ public class HChannelHandler extends Sim
return;
}
+ // Process Subscription Events
+ if (response.hasResponseBody()) {
+ ResponseBody resp = response.getResponseBody();
+ // A special subscription event indicates the state of a subscriber
+ if (resp.hasSubscriptionEvent()) {
+ if (null == subHandler) {
+ logger.error("Received subscription event from a non-subscription channel : {}",
+ response);
+ } else {
+ SubscriptionEventResponse eventResp = resp.getSubscriptionEvent();
+ logger.debug("Received subscription event {} for (topic:{}, subscriber:{}).",
+ va(eventResp.getEvent(), response.getTopic(),
+ response.getSubscriberId()));
+ subHandler.handleSubscriptionEvent(response.getTopic(),
+ response.getSubscriberId(),
+ eventResp.getEvent());
+ }
+ return;
+ }
+ }
+
// Response is an ack to a prior PubSubRequest so first retrieve the
// PubSub data for this txn.
PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId());
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java?rev=1405028&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java Fri Nov 2 16:14:59 2012
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.multiplex;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.CleanupChannelMap;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
+import org.apache.hedwig.client.netty.impl.HChannelHandler;
+import org.apache.hedwig.client.netty.impl.HChannelImpl;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+
+/**
+ * Multiplex HChannel Manager which establish a connection for multi subscriptions.
+ */
+public class MultiplexHChannelManager extends AbstractHChannelManager {
+
+ static final Logger logger = LoggerFactory.getLogger(MultiplexHChannelManager.class);
+
+ // Find which HChannel that a given TopicSubscriber used.
+ protected final CleanupChannelMap<InetSocketAddress> subscriptionChannels;
+
+ // Concurrent Map to store Message handler for each topic + sub id combination.
+ // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
+ // user set when connection is recovered
+ protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler
+ = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
+
+ // PipelineFactory to create subscription netty channels to the appropriate server
+ private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;
+
+ public MultiplexHChannelManager(ClientConfiguration cfg,
+ ChannelFactory socketFactory) {
+ super(cfg, socketFactory);
+ subscriptionChannels = new CleanupChannelMap<InetSocketAddress>();
+ subscriptionChannelPipelineFactory =
+ new MultiplexSubscriptionChannelPipelineFactory(cfg, this);
+ }
+
+ @Override
+ protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
+ return subscriptionChannelPipelineFactory;
+ }
+
+ @Override
+ protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
+ // store the channel connected to target host for future usage
+ InetSocketAddress host = NetUtils.getHostFromChannel(channel);
+ HChannel newHChannel = new HChannelImpl(host, channel, this,
+ getSubscriptionChannelPipelineFactory());
+ return storeSubscriptionChannel(host, newHChannel);
+ }
+
+ @Override
+ protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
+ HChannel newHChannel = new HChannelImpl(host, this,
+ getSubscriptionChannelPipelineFactory());
+ return storeSubscriptionChannel(host, newHChannel);
+ }
+
+ private HChannel storeSubscriptionChannel(InetSocketAddress host,
+ HChannel newHChannel) {
+ // here, we guarantee there is only one channel used to communicate with target
+ // host.
+ return subscriptionChannels.addChannel(host, newHChannel);
+ }
+
+ @Override
+ protected HChannel getSubscriptionChannel(InetSocketAddress host) {
+ return subscriptionChannels.getChannel(host);
+ }
+
+ protected HChannel getSubscriptionChannel(TopicSubscriber subscriber) {
+ InetSocketAddress host = topic2Host.get(subscriber.getTopic());
+ if (null == host) {
+ // we don't know where is the owner of the topic
+ return null;
+ } else {
+ return getSubscriptionChannel(host);
+ }
+ }
+
+ @Override
+ protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
+ InetSocketAddress host = topic2Host.get(subscriber.getTopic());
+ if (null == host) {
+ // we don't know where is the topic
+ return null;
+ } else {
+ // we had know which server owned the topic
+ HChannel channel = getSubscriptionChannel(host);
+ if (null == channel) {
+ // create a channel to connect to sepcified host
+ channel = createAndStoreSubscriptionChannel(host);
+ }
+ return channel;
+ }
+ }
+
+ @Override
+ protected void onSubscriptionChannelDisconnected(InetSocketAddress host,
+ Channel channel) {
+ HChannel hChannel = subscriptionChannels.getChannel(host);
+ if (null == hChannel) {
+ return;
+ }
+ Channel underlyingChannel = hChannel.getChannel();
+ if (null == underlyingChannel ||
+ !underlyingChannel.equals(channel)) {
+ return;
+ }
+ logger.info("Subscription Channel {} disconnected from {}.",
+ va(channel, host));
+ // remove existed channel
+ if (subscriptionChannels.removeChannel(host, hChannel)) {
+ try {
+ HChannelHandler channelHandler =
+ HChannelImpl.getHChannelHandlerFromChannel(channel);
+ channelHandler.getSubscribeResponseHandler()
+ .onChannelDisconnected(host, channel);
+ } catch (NoResponseHandlerException nrhe) {
+ logger.warn("No Channel Handler found for channel {} when it disconnected.",
+ channel);
+ }
+ }
+ }
+
+ @Override
+ public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
+ HChannel hChannel = getSubscriptionChannel(topicSubscriber);
+ if (null == hChannel) {
+ return null;
+ }
+ Channel channel = hChannel.getChannel();
+ if (null == channel) {
+ return null;
+ }
+ try {
+ HChannelHandler channelHandler =
+ HChannelImpl.getHChannelHandlerFromChannel(channel);
+ return channelHandler.getSubscribeResponseHandler();
+ } catch (NoResponseHandlerException nrhe) {
+ logger.warn("No Channel Handler found for channel {}, topic subscriber {}.",
+ channel, topicSubscriber);
+ return null;
+ }
+ }
+
+ @Override
+ public void startDelivery(TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ startDelivery(topicSubscriber, messageHandler, false);
+ }
+
+ protected void restartDelivery(TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ startDelivery(topicSubscriber, null, true);
+ }
+
+ private void startDelivery(TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler,
+ boolean restart)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ // Make sure we know about this topic subscription on the client side
+ // exists. The assumption is that the client should have in memory the
+ // Channel created for the TopicSubscriber once the server has sent
+ // an ack response to the initial subscribe request.
+ SubscribeResponseHandler subscribeResponseHandler =
+ getSubscribeResponseHandler(topicSubscriber);
+ if (null == subscribeResponseHandler ||
+ !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+ logger.error("Client is not yet subscribed to {}.", topicSubscriber);
+ throw new ClientNotSubscribedException("Client is not yet subscribed to "
+ + topicSubscriber);
+ }
+
+ MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
+ if (restart) {
+ // restart using existing msg handler
+ messageHandler = existedMsgHandler;
+ } else {
+ // some has started delivery but not stop it
+ if (null != existedMsgHandler) {
+ throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
+ }
+ if (messageHandler != null) {
+ if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
+ throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
+ }
+ }
+ }
+
+ // tell subscribe response handler to start delivering messages for topicSubscriber
+ subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
+ }
+
+ public void stopDelivery(TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException {
+ // Make sure we know that this topic subscription on the client side
+ // exists. The assumption is that the client should have in memory the
+ // Channel created for the TopicSubscriber once the server has sent
+ // an ack response to the initial subscribe request.
+ SubscribeResponseHandler subscribeResponseHandler =
+ getSubscribeResponseHandler(topicSubscriber);
+ if (null == subscribeResponseHandler ||
+ !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+ logger.error("Client is not yet subscribed to {}.", topicSubscriber);
+ throw new ClientNotSubscribedException("Client is not yet subscribed to "
+ + topicSubscriber);
+ }
+
+ // tell subscribe response handler to stop delivering messages for a given topic subscriber
+ topicSubscriber2MessageHandler.remove(topicSubscriber);
+ subscribeResponseHandler.stopDelivery(topicSubscriber);
+ }
+
+
+ @Override
+ public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+ final Callback<ResponseBody> callback,
+ final Object context) {
+ SubscribeResponseHandler subscribeResponseHandler =
+ getSubscribeResponseHandler(topicSubscriber);
+ if (null == subscribeResponseHandler ||
+ !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
+ logger.warn("Trying to close a subscription when we don't have a subscription channel cached for {}",
+ topicSubscriber);
+ callback.operationFinished(context, (ResponseBody)null);
+ return;
+ }
+ subscribeResponseHandler.asyncCloseSubscription(topicSubscriber, callback, context);
+ }
+
+ @Override
+ protected void checkTimeoutRequestsOnSubscriptionChannels() {
+ // timeout task may be started before constructing subscriptionChannels
+ if (null == subscriptionChannels) {
+ return;
+ }
+ for (HChannel channel : subscriptionChannels.getChannels()) {
+ try {
+ HChannelHandler channelHandler =
+ HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
+ channelHandler.checkTimeoutRequests();
+ } catch (NoResponseHandlerException nrhe) {
+ continue;
+ }
+ }
+ }
+
+ @Override
+ protected void closeSubscriptionChannels() {
+ subscriptionChannels.close();
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java?rev=1405028&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java Fri Nov 2 16:14:59 2012
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.multiplex;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.MessageConsumeData;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.client.netty.HChannel;
+import org.apache.hedwig.client.netty.NetUtils;
+import org.apache.hedwig.client.netty.FilterableMessageHandler;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.SubscriptionListener;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public class MultiplexSubscribeResponseHandler extends SubscribeResponseHandler {
+
+ private static Logger logger =
+ LoggerFactory.getLogger(MultiplexSubscribeResponseHandler.class);
+
+ class ActiveSubscriber implements SubscriptionListener {
+ private final TopicSubscriber topicSubscriber;
+ private final PubSubData op;
+ private final SubscriptionPreferences preferences;
+
+ // the underlying netty channel to send request
+ private final Channel channel;
+
+ // Counter for the number of consumed messages so far to buffer up before we
+ // send the Consume message back to the server along with the last/largest
+ // message seq ID seen so far in that batch.
+ private int numConsumedMessagesInBuffer = 0;
+ private MessageSeqId lastMessageSeqId;
+
+ // Message Handler
+ private MessageHandler msgHandler;
+
+ // Queue used for subscribes when the MessageHandler hasn't been registered
+ // yet but we've already received subscription messages from the server.
+ // This will be lazily created as needed.
+ private Queue<Message> msgQueue = new LinkedList<Message>();
+
+ ActiveSubscriber(TopicSubscriber ts, PubSubData op,
+ SubscriptionPreferences preferences,
+ Channel channel) {
+ this.topicSubscriber = ts;
+ this.op = op;
+ this.preferences = preferences;
+ this.channel = channel;
+ }
+
+ PubSubData getPubSubData() {
+ return this.op;
+ }
+
+ TopicSubscriber getTopicSubscriber() {
+ return this.topicSubscriber;
+ }
+
+ synchronized boolean updateLastMessageSeqId(MessageSeqId seqId) {
+ if (null != lastMessageSeqId &&
+ seqId.getLocalComponent() <= lastMessageSeqId.getLocalComponent()) {
+ return false;
+ }
+ ++numConsumedMessagesInBuffer;
+ lastMessageSeqId = seqId;
+ if (numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
+ numConsumedMessagesInBuffer = 0;
+ lastMessageSeqId = null;
+ return true;
+ }
+ return false;
+ }
+
+ synchronized void startDelivery(MessageHandler messageHandler)
+ throws AlreadyStartDeliveryException, ClientNotSubscribedException {
+ if (null != this.msgHandler) {
+ throw new AlreadyStartDeliveryException("A message handler " + msgHandler
+ + " has been started for " + topicSubscriber);
+ }
+ if (null != messageHandler && messageHandler instanceof FilterableMessageHandler) {
+ FilterableMessageHandler filterMsgHandler =
+ (FilterableMessageHandler) messageHandler;
+ if (filterMsgHandler.hasMessageFilter()) {
+ if (null == preferences) {
+ // no preferences means talking to an old version hub server
+ logger.warn("Start delivering messages with filter but no subscription "
+ + "preferences found. It might due to talking to an old version"
+ + " hub server.");
+ // use the original message handler.
+ messageHandler = filterMsgHandler.getMessageHandler();
+ } else {
+ // pass subscription preferences to message filter
+ if (logger.isDebugEnabled()) {
+ logger.debug("Start delivering messages with filter on {}, preferences: {}",
+ va(topicSubscriber,
+ SubscriptionStateUtils.toString(preferences)));
+ }
+ ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
+ msgFilter.setSubscriptionPreferences(topicSubscriber.getTopic(),
+ topicSubscriber.getSubscriberId(),
+ preferences);
+ }
+ }
+ }
+
+ this.msgHandler = messageHandler;
+ // Once the MessageHandler is registered, see if we have any queued up
+ // subscription messages sent to us already from the server. If so,
+ // consume those first. Do this only if the MessageHandler registered is
+ // not null (since that would be the HedwigSubscriber.stopDelivery
+ // call).
+ if (null == msgHandler) {
+ return;
+ }
+ if (msgQueue.size() > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Consuming {} queued up messages for {}",
+ va(msgQueue.size(), topicSubscriber));
+ }
+ for (Message message : msgQueue) {
+ asyncMessageDeliver(message);
+ }
+ // Now we can remove the queued up messages since they are all
+ // consumed.
+ msgQueue.clear();
+ }
+ }
+
+ synchronized void stopDelivery() {
+ this.msgHandler = null;
+ }
+
+ synchronized void handleMessage(Message message) {
+ if (null != msgHandler) {
+ asyncMessageDeliver(message);
+ } else {
+ // MessageHandler has not yet been registered so queue up these
+ // messages for the Topic Subscription. Make the initial lazy
+ // creation of the message queue thread safe just so we don't
+ // run into a race condition where two simultaneous threads process
+ // a received message and both try to create a new instance of
+ // the message queue. Performance overhead should be okay
+ // because the delivery of the topic has not even started yet
+ // so these messages are not consumed and just buffered up here.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message {} has arrived but no MessageHandler provided for {}"
+ + " yet so queueing up the message.",
+ va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
+ topicSubscriber));
+ }
+ msgQueue.add(message);
+ }
+ }
+
+ synchronized void asyncMessageDeliver(Message message) {
+ if (null == msgHandler) {
+ logger.error("No message handler found to deliver message {} to {}.",
+ va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
+ topicSubscriber));
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
+ va(message, topicSubscriber));
+ }
+ MessageConsumeData messageConsumeData =
+ new MessageConsumeData(topicSubscriber, message);
+ msgHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
+ message, sChannelManager.getConsumeCallback(),
+ messageConsumeData);
+ }
+
+ void consume(final MessageSeqId messageSeqId) {
+ PubSubRequest.Builder pubsubRequestBuilder =
+ NetUtils.buildConsumeRequest(sChannelManager.nextTxnId(),
+ topicSubscriber, messageSeqId);
+
+ // For Consume requests, we will send them from the client in a fire and
+ // forget manner. We are not expecting the server to send back an ack
+ // response so no need to register this in the ResponseHandler. There
+ // are no callbacks to invoke since this isn't a client initiated
+ // action. Instead, just have a future listener that will log an error
+ // message if there was a problem writing the consume request.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
+ va(NetUtils.getHostFromChannel(channel),
+ messageSeqId, topicSubscriber));
+ }
+ ChannelFuture future = channel.write(pubsubRequestBuilder.build());
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
+ va(host, messageSeqId, topicSubscriber));
+ }
+ }
+ });
+ }
+
+ @Override
+ public void processEvent(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event) {
+ switch (event) {
+ // for all cases we need to resubscribe for the subscription
+ case TOPIC_MOVED:
+ sChannelManager.clearHostForTopic(topic, NetUtils.getHostFromChannel(channel));
+ resubscribeIfNecessary(event);
+ break;
+ case SUBSCRIPTION_FORCED_CLOSED:
+ resubscribeIfNecessary(event);
+ break;
+ default:
+ logger.error("Receive unknown subscription event {} for {}.",
+ va(event, topicSubscriber));
+ }
+ }
+
+ private void resubscribeIfNecessary(SubscriptionEvent event) {
+ // if subscriber has been changed, we don't need to resubscribe
+ if (!subscriptions.remove(topicSubscriber, this)) {
+ return;
+ }
+ if (!op.options.getEnableResubscribe()) {
+ sChannelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
+ topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), event);
+ return;
+ }
+ // Since the connection to the server host that was responsible
+ // for the topic died, we are not sure about the state of that
+ // server. Resend the original subscribe request data to the default
+ // server host/VIP. Also clear out all of the servers we've
+ // contacted or attempted to from this request as we are starting a
+ // "fresh" subscribe request.
+ op.clearServersList();
+ // Set a new type of VoidCallback for this async call. We need this
+ // hook so after the resubscribe has completed, delivery for
+ // that topic subscriber should also be restarted (if it was that
+ // case before the channel disconnect).
+ final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
+ ResubscribeCallback resubscribeCb =
+ new ResubscribeCallback(topicSubscriber, op,
+ sChannelManager, retryWaitTime);
+ op.setCallback(resubscribeCb);
+ op.context = null;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Resubscribe {} with origSubData {}",
+ va(topicSubscriber, op));
+ }
+ // resubmit the request
+ sChannelManager.submitOp(op);
+ }
+ }
+
+ protected final ReentrantReadWriteLock disconnectLock =
+ new ReentrantReadWriteLock();
+
+ // the underlying subscription channel
+ volatile HChannel hChannel;
+ InetSocketAddress host;
+ protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions
+ = new ConcurrentHashMap<TopicSubscriber, ActiveSubscriber>();
+ private final MultiplexHChannelManager sChannelManager;
+
+ protected MultiplexSubscribeResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ super(cfg, channelManager);
+ sChannelManager = (MultiplexHChannelManager) channelManager;
+ }
+
+ protected HChannelManager getHChannelManager() {
+ return this.sChannelManager;
+ }
+
+ protected ClientConfiguration getConfiguration() {
+ return cfg;
+ }
+
+ private ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
+ return subscriptions.get(ts);
+ }
+
+ @Override
+ public void handleResponse(PubSubResponse response, PubSubData pubSubData,
+ Channel channel) throws Exception {
+ if (null == hChannel) {
+ host = NetUtils.getHostFromChannel(channel);
+ hChannel = sChannelManager.getSubscriptionChannel(host);
+ if (null == hChannel ||
+ !channel.equals(hChannel.getChannel())) {
+ PubSubException pse =
+ new UnexpectedConditionException("Failed to get subscription channel of " + host);
+ pubSubData.getCallback().operationFailed(pubSubData.context, pse);
+ return;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
+ va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
+ }
+ switch (response.getStatusCode()) {
+ case SUCCESS:
+ TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
+ pubSubData.subscriberId);
+ SubscriptionPreferences preferences = null;
+ if (response.hasResponseBody()) {
+ ResponseBody respBody = response.getResponseBody();
+ if (respBody.hasSubscribeResponse()) {
+ SubscribeResponse resp = respBody.getSubscribeResponse();
+ if (resp.hasPreferences()) {
+ preferences = resp.getPreferences();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Receive subscription preferences for {} : {}",
+ va(ts,
+ SubscriptionStateUtils.toString(preferences)));
+ }
+ }
+ }
+ }
+
+ ActiveSubscriber ss = new ActiveSubscriber(ts, pubSubData, preferences,
+ channel);
+
+ boolean success = false;
+ // Store the Subscribe state
+ disconnectLock.readLock().lock();
+ try {
+ ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss);
+ if (null != oldSS) {
+ logger.warn("Subscribe {} has existed in channel {}.",
+ va(ts, channel));
+ success = false;
+ } else {
+ logger.debug("Succeed to add subscription {} in channel {}.",
+ va(ts, channel));
+ success = true;
+ }
+ } finally {
+ disconnectLock.readLock().unlock();
+ }
+ if (success) {
+ // Response was success so invoke the callback's operationFinished
+ // method.
+ pubSubData.getCallback().operationFinished(pubSubData.context, null);
+ } else {
+ ClientAlreadySubscribedException exception =
+ new ClientAlreadySubscribedException("Client is already subscribed for " + ts);
+ pubSubData.getCallback().operationFailed(pubSubData.context, exception);
+ }
+ break;
+ case CLIENT_ALREADY_SUBSCRIBED:
+ // For Subscribe requests, the server says that the client is
+ // already subscribed to it.
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
+ "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+ + pubSubData.subscriberId.toStringUtf8()));
+ break;
+ case SERVICE_DOWN:
+ // Response was service down failure so just invoke the callback's
+ // operationFailed method.
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+ "Server responded with a SERVICE_DOWN status"));
+ break;
+ case NOT_RESPONSIBLE_FOR_TOPIC:
+ // Redirect response so we'll need to repost the original Subscribe
+ // Request
+ handleRedirectResponse(response, pubSubData, channel);
+ break;
+ default:
+ // Consider all other status codes as errors, operation failed
+ // cases.
+ logger.error("Unexpected error response from server for PubSubResponse: " + response);
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+ "Server responded with a status code of: " + response.getStatusCode()));
+ break;
+ }
+ }
+
+ @Override
+ public void handleSubscribeMessage(PubSubResponse response) {
+ Message message = response.getMessage();
+ TopicSubscriber ts = new TopicSubscriber(response.getTopic(),
+ response.getSubscriberId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling a Subscribe message in response: {}, {}",
+ va(response, ts));
+ }
+ ActiveSubscriber ss = getActiveSubscriber(ts);
+ if (null == ss) {
+ logger.error("Subscriber {} is not found receiving its message {}.",
+ va(ts, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+ return;
+ }
+ ss.handleMessage(message);
+ }
+
+ @Override
+ protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
+ Message message) {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ logger.error("Subscriber {} is not found delivering its message {}.",
+ va(topicSubscriber,
+ MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+ return;
+ }
+ ss.asyncMessageDeliver(message);
+ }
+
+ @Override
+ protected void messageConsumed(TopicSubscriber topicSubscriber,
+ Message message) {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ logger.warn("Subscriber {} is not found consumed its message {}.",
+ va(topicSubscriber,
+ MessageIdUtils.msgIdToReadableString(message.getMsgId())));
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message has been successfully consumed by the client app : {}, {}",
+ va(message, topicSubscriber));
+ }
+ // For consume response to server, there is a config param on how many
+ // messages to consume and buffer up before sending the consume request.
+ // We just need to keep a count of the number of messages consumed
+ // and the largest/latest msg ID seen so far in this batch. Messages
+ // should be delivered in order and without gaps. Do this only if
+ // auto-sending of consume messages is enabled.
+ if (cfg.isAutoSendConsumeMessageEnabled()) {
+ // Update these variables only if we are auto-sending consume
+ // messages to the server. Otherwise the onus is on the client app
+ // to call the Subscriber consume API to let the server know which
+ // messages it has successfully consumed.
+ if (ss.updateLastMessageSeqId(message.getMsgId())) {
+ // Send the consume request and reset the consumed messages buffer
+ // variables. We will use the same Channel created from the
+ // subscribe request for the TopicSubscriber.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Consume message {} when reaching consumed message buffer limit.",
+ message.getMsgId());
+ }
+ ss.consume(message.getMsgId());
+ }
+ }
+ }
+
+ @Override
+ public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event) {
+ TopicSubscriber ts = new TopicSubscriber(topic, subscriberId);
+ ActiveSubscriber ss = getActiveSubscriber(ts);
+ if (null == ss) {
+ logger.warn("No subscription {} found receiving subscription event {}.",
+ va(ts, event));
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received subscription event {} for ({}).",
+ va(event, ts));
+ }
+ ss.processEvent(topic, subscriberId, event);
+ }
+
+ @Override
+ public void startDelivery(final TopicSubscriber topicSubscriber,
+ MessageHandler messageHandler)
+ throws ClientNotSubscribedException, AlreadyStartDeliveryException {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Start delivering message for {} using message handler {}",
+ va(topicSubscriber, messageHandler));
+ }
+ ss.startDelivery(messageHandler);
+ }
+
+ @Override
+ public void stopDelivery(final TopicSubscriber topicSubscriber)
+ throws ClientNotSubscribedException {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stop delivering messages for {}", topicSubscriber);
+ }
+ ss.stopDelivery();
+ }
+
+ @Override
+ public boolean hasSubscription(TopicSubscriber topicSubscriber) {
+ return subscriptions.containsKey(topicSubscriber);
+ }
+
+ @Override
+ public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
+ final Callback<ResponseBody> callback,
+ final Object context) {
+ final ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss || null == hChannel) {
+ logger.debug("No subscription {} found when closing its subscription from {}.",
+ va(topicSubscriber, hChannel));
+ callback.operationFinished(context, (ResponseBody)null);
+ return;
+ }
+ Callback<ResponseBody> closeCb = new Callback<ResponseBody>() {
+ @Override
+ public void operationFinished(Object ctx, ResponseBody respBody) {
+ disconnectLock.readLock().lock();
+ try {
+ subscriptions.remove(topicSubscriber, ss);
+ } finally {
+ disconnectLock.readLock().unlock();
+ }
+ callback.operationFinished(context, null);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ callback.operationFailed(context, exception);
+ }
+ };
+ PubSubData closeOp = new PubSubData(topicSubscriber.getTopic(), null,
+ topicSubscriber.getSubscriberId(),
+ OperationType.CLOSESUBSCRIPTION,
+ null, closeCb, context);
+ hChannel.submitOp(closeOp);
+ }
+
+ @Override
+ public void consume(final TopicSubscriber topicSubscriber,
+ final MessageSeqId messageSeqId) {
+ ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
+ if (null == ss) {
+ logger.warn("Subscriber {} is not found consuming message {}.",
+ va(topicSubscriber,
+ MessageIdUtils.msgIdToReadableString(messageSeqId)));
+ return;
+ }
+ ss.consume(messageSeqId);
+ }
+
+ @Override
+ public void onChannelDisconnected(InetSocketAddress host, Channel channel) {
+ disconnectLock.writeLock().lock();
+ try {
+ onDisconnect(host);
+ } finally {
+ disconnectLock.writeLock().unlock();
+ }
+ }
+
+ private void onDisconnect(InetSocketAddress host) {
+ for (ActiveSubscriber ss : subscriptions.values()) {
+ onDisconnect(ss, host);
+ }
+ }
+
+ private void onDisconnect(ActiveSubscriber ss, InetSocketAddress host) {
+ TopicSubscriber ts = ss.getTopicSubscriber();
+ logger.info("Subscription channel for ({}) is disconnected.", ts);
+ ss.processEvent(ts.getTopic(), ts.getSubscriberId(),
+ SubscriptionEvent.TOPIC_MOVED);
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java?rev=1405028&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java Fri Nov 2 16:14:59 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.multiplex;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler;
+import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
+import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
+import org.apache.hedwig.client.netty.impl.HChannelHandler;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+public class MultiplexSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
+
+ public MultiplexSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
+ MultiplexHChannelManager channelManager) {
+ super(cfg, channelManager);
+ }
+
+ @Override
+ protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
+ Map<OperationType, AbstractResponseHandler> handlers =
+ new HashMap<OperationType, AbstractResponseHandler>();
+ handlers.put(OperationType.SUBSCRIBE,
+ new MultiplexSubscribeResponseHandler(cfg, channelManager));
+ handlers.put(OperationType.CLOSESUBSCRIPTION,
+ new CloseSubscriptionResponseHandler(cfg, channelManager));
+ return handlers;
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java?rev=1405028&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/ResubscribeCallback.java Fri Nov 2 16:14:59 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty.impl.multiplex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+/**
+ * This class is used when a Subscribe channel gets disconnected and we attempt
+ * to resubmit subscribe requests existed in that channel. Once the resubscribe
+ * the topic is completed, we need to restart delivery for that topic.
+ */
+class ResubscribeCallback implements Callback<ResponseBody> {
+
+ private static Logger logger = LoggerFactory.getLogger(ResubscribeCallback.class);
+
+ // Private member variables
+ private final TopicSubscriber origTopicSubscriber;
+ private final PubSubData origSubData;
+ private final MultiplexHChannelManager channelManager;
+ private final long retryWaitTime;
+
+ // Constructor
+ ResubscribeCallback(TopicSubscriber origTopicSubscriber,
+ PubSubData origSubData,
+ MultiplexHChannelManager channelManager,
+ long retryWaitTime) {
+ this.origTopicSubscriber = origTopicSubscriber;
+ this.origSubData = origSubData;
+ this.channelManager = channelManager;
+ this.retryWaitTime = retryWaitTime;
+ }
+
+ @Override
+ public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
+ if (logger.isDebugEnabled())
+ logger.debug("Resubscribe succeeded for origSubData: " + origSubData);
+ // Now we want to restart delivery for the subscription channel only
+ // if delivery was started at the time the original subscribe channel
+ // was disconnected.
+ try {
+ channelManager.restartDelivery(origTopicSubscriber);
+ } catch (ClientNotSubscribedException e) {
+ // This exception should never be thrown here but just in case,
+ // log an error and just keep retrying the subscribe request.
+ logger.error("Subscribe was successful but error starting delivery for {} : {}",
+ va(origTopicSubscriber, e.getMessage()));
+ retrySubscribeRequest();
+ } catch (AlreadyStartDeliveryException asde) {
+ // should not reach here
+ }
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ // If the resubscribe fails, just keep retrying the subscribe
+ // request. There isn't a way to flag to the application layer that
+ // a topic subscription has failed. So instead, we'll just keep
+ // retrying in the background until success.
+ logger.error("Resubscribe failed with error: " + exception.getMessage());
+ retrySubscribeRequest();
+ }
+
+ private void retrySubscribeRequest() {
+ origSubData.clearServersList();
+ logger.debug("Resubmit subscribe request for {} in {} ms later.",
+ va(origTopicSubscriber, retryWaitTime));
+ channelManager.submitOpAfterDelay(origSubData, retryWaitTime);
+ }
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java?rev=1405028&r1=1405027&r2=1405028&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java Fri Nov 2 16:14:59 2012
@@ -247,6 +247,21 @@ public class SimpleSubscribeResponseHand
}
@Override
+ public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event) {
+ Channel channel;
+ synchronized (this) {
+ channel = subscribeChannel;
+ }
+ if (null == channel) {
+ logger.warn("No subscription channel found when receiving subscription event {} for (topic:{}, subscriber:{}).",
+ va(event, topic, subscriberId));
+ return;
+ }
+ processSubscriptionEvent(event, NetUtils.getHostFromChannel(channel), channel);
+ }
+
+ @Override
protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
Message message) {
if (logger.isDebugEnabled()) {
@@ -488,7 +503,20 @@ public class SimpleSubscribeResponseHand
if (origTopicSubscriber == null) {
return;
}
- sChannelManager.clearHostForTopic(origTopicSubscriber.getTopic(), host);
+ processSubscriptionEvent(SubscriptionEvent.TOPIC_MOVED, host, channel);
+ }
+
+ private void processSubscriptionEvent(final SubscriptionEvent event, InetSocketAddress host,
+ final Channel channel) {
+ if (SubscriptionEvent.TOPIC_MOVED != event &&
+ SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED != event) {
+ logger.warn("Ignore subscription event {} received from channel {}.",
+ event, channel);
+ return;
+ }
+ if (SubscriptionEvent.TOPIC_MOVED == event) {
+ sChannelManager.clearHostForTopic(origTopicSubscriber.getTopic(), host);
+ }
// clear subscription status
sChannelManager.asyncCloseSubscription(origTopicSubscriber, new Callback<ResponseBody>() {
@@ -530,10 +558,10 @@ public class SimpleSubscribeResponseHand
origTopicSubscriber, origSubData);
sChannelManager.submitOpToDefaultServer(origSubData);
} else {
- logger.info("Subscription channel for ({}) is disconnected.",
- origTopicSubscriber);
+ logger.info("Subscription channel {} for ({}) is disconnected.",
+ channel, origTopicSubscriber);
sChannelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
- origSubData.topic, origSubData.subscriberId, SubscriptionEvent.TOPIC_MOVED);
+ origSubData.topic, origSubData.subscriberId, event);
}
}
}, null);