You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:33 UTC
[23/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
deleted file mode 100644
index 614efa1..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java
+++ /dev/null
@@ -1,622 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.handlers.MessageConsumeCallback;
-import org.apache.hedwig.client.netty.CleanupChannelMap;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.SubscriptionEventEmitter;
-import org.apache.hedwig.client.ssl.SslClientContextFactory;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-import static org.apache.hedwig.util.VarArgs.va;
-
-/**
- * Basic HChannel Manager Implementation
- */
-public abstract class AbstractHChannelManager implements HChannelManager {
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractHChannelManager.class);
-
- // Empty Topic List
- private final static Set<ByteString> EMPTY_TOPIC_SET =
- new HashSet<ByteString>();
-
- // Boolean indicating if the channel manager is running or has been closed.
- // Once we stop the manager, we should sidestep all of the connect, write callback
- // and channel disconnected logic.
- protected boolean closed = false;
- protected final ReentrantReadWriteLock closedLock =
- new ReentrantReadWriteLock();
-
- // Global counter used for generating unique transaction ID's for
- // publish and subscribe requests
- protected final AtomicLong globalCounter = new AtomicLong();
-
- // Concurrent Map to store the mapping from the Topic to the Host.
- // This could change over time since servers can drop mastership of topics
- // for load balancing or failover. If a server host ever goes down, we'd
- // also want to remove all topic mappings the host was responsible for.
- // The second Map is used as the inverted version of the first one.
- protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host =
- new ConcurrentHashMap<ByteString, InetSocketAddress>();
- // The inverse mapping is used only when clearing all topics. For performance
- // consideration, we don't guarantee host2Topics to be consistent with
- // topic2Host. it would be better to not rely on this mapping for anything
- // significant.
- protected final ConcurrentMap<InetSocketAddress, Set<ByteString>> host2Topics =
- new ConcurrentHashMap<InetSocketAddress, Set<ByteString>>();
-
- // This channels will be used for publish and unsubscribe requests
- protected final CleanupChannelMap<InetSocketAddress> host2NonSubscriptionChannels =
- new CleanupChannelMap<InetSocketAddress>();
-
- private final ClientConfiguration cfg;
- // The Netty socket factory for making connections to the server.
- protected final ChannelFactory socketFactory;
- // PipelineFactory to create non-subscription netty channels to the appropriate server
- private final ClientChannelPipelineFactory nonSubscriptionChannelPipelineFactory;
- // ssl context factory
- private SslClientContextFactory sslFactory = null;
-
- // default server channel
- private final HChannel defaultServerChannel;
-
- // Each client instantiation will have a Timer for running recurring
- // threads. One such timer task thread to is to timeout long running
- // PubSubRequests that are waiting for an ack response from the server.
- private final Timer clientTimer = new Timer(true);
- // a common consume callback for all consume requests.
- private final MessageConsumeCallback consumeCb;
- // A event emitter to emit subscription events
- private final SubscriptionEventEmitter eventEmitter;
-
- protected AbstractHChannelManager(ClientConfiguration cfg,
- ChannelFactory socketFactory) {
- this.cfg = cfg;
- this.socketFactory = socketFactory;
- this.nonSubscriptionChannelPipelineFactory =
- new NonSubscriptionChannelPipelineFactory(cfg, this);
-
- // create a default server channel
- defaultServerChannel =
- new DefaultServerChannel(cfg.getDefaultServerHost(), this);
-
- if (cfg.isSSLEnabled()) {
- sslFactory = new SslClientContextFactory(cfg);
- }
-
- consumeCb = new MessageConsumeCallback(cfg, this);
- eventEmitter = new SubscriptionEventEmitter();
-
- // Schedule Request Timeout task.
- clientTimer.schedule(new PubSubRequestTimeoutTask(), 0,
- cfg.getTimeoutThreadRunInterval());
- }
-
- @Override
- public SubscriptionEventEmitter getSubscriptionEventEmitter() {
- return eventEmitter;
- }
-
- public MessageConsumeCallback getConsumeCallback() {
- return consumeCb;
- }
-
- public SslClientContextFactory getSslFactory() {
- return sslFactory;
- }
-
- protected ChannelFactory getChannelFactory() {
- return socketFactory;
- }
-
- protected ClientChannelPipelineFactory getNonSubscriptionChannelPipelineFactory() {
- return this.nonSubscriptionChannelPipelineFactory;
- }
-
- protected abstract ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory();
-
- @Override
- public void schedule(final TimerTask task, final long delay) {
- this.closedLock.readLock().lock();
- try {
- if (closed) {
- logger.warn("Task {} is not scheduled due to the channel manager is closed.",
- task);
- return;
- }
- clientTimer.schedule(task, delay);
- } finally {
- this.closedLock.readLock().unlock();
- }
- }
-
- @Override
- public void submitOpAfterDelay(final PubSubData pubSubData, final long delay) {
- this.closedLock.readLock().lock();
- try {
- if (closed) {
- pubSubData.getCallback().operationFailed(pubSubData.context,
- new ServiceDownException("Client has been closed."));
- return;
- }
- clientTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- logger.debug("Submit request {} in {} ms later.",
- va(pubSubData, delay));
- submitOp(pubSubData);
- }
- }, delay);
- } finally {
- closedLock.readLock().unlock();
- }
- }
-
- @Override
- public void submitOp(PubSubData pubSubData) {
- HChannel hChannel;
- if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
- OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
- hChannel = getNonSubscriptionChannelByTopic(pubSubData.topic);
- } else {
- TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
- pubSubData.subscriberId);
- hChannel = getSubscriptionChannelByTopicSubscriber(ts);
- }
- // no channel found to submit pubsub data
- // choose the default server
- if (null == hChannel) {
- hChannel = defaultServerChannel;
- }
- hChannel.submitOp(pubSubData);
- }
-
- @Override
- public void redirectToHost(PubSubData pubSubData, InetSocketAddress host) {
- logger.debug("Submit operation {} to host {}.",
- va(pubSubData, host));
- HChannel hChannel;
- if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
- OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
- hChannel = getNonSubscriptionChannel(host);
- if (null == hChannel) {
- // create a channel to connect to specified host
- hChannel = createAndStoreNonSubscriptionChannel(host);
- }
- } else {
- hChannel = getSubscriptionChannel(host);
- if (null == hChannel) {
- // create a subscription channel to specified host
- hChannel = createAndStoreSubscriptionChannel(host);
- }
- }
- // no channel found to submit pubsub data
- // choose the default server
- if (null == hChannel) {
- hChannel = defaultServerChannel;
- }
- hChannel.submitOp(pubSubData);
- }
-
- void submitOpThruChannel(PubSubData pubSubData, Channel channel) {
- logger.debug("Submit operation {} to thru channel {}.",
- va(pubSubData, channel));
- HChannel hChannel;
- if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
- OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
- hChannel = createAndStoreNonSubscriptionChannel(channel);
- } else {
- hChannel = createAndStoreSubscriptionChannel(channel);
- }
- hChannel.submitOp(pubSubData);
- }
-
- @Override
- public void submitOpToDefaultServer(PubSubData pubSubData) {
- logger.debug("Submit operation {} to default server {}.",
- va(pubSubData, defaultServerChannel));
- defaultServerChannel.submitOp(pubSubData);
- }
-
- // Synchronized method to store the host2Channel mapping (if it doesn't
- // exist yet). Retrieve the hostname info from the Channel created via the
- // RemoteAddress tied to it.
- private HChannel createAndStoreNonSubscriptionChannel(Channel channel) {
- InetSocketAddress host = NetUtils.getHostFromChannel(channel);
- HChannel newHChannel = new HChannelImpl(host, channel, this,
- getNonSubscriptionChannelPipelineFactory());
- return storeNonSubscriptionChannel(host, newHChannel);
- }
-
- private HChannel createAndStoreNonSubscriptionChannel(InetSocketAddress host) {
- HChannel newHChannel = new HChannelImpl(host, this,
- getNonSubscriptionChannelPipelineFactory());
- return storeNonSubscriptionChannel(host, newHChannel);
- }
-
- private HChannel storeNonSubscriptionChannel(InetSocketAddress host,
- HChannel newHChannel) {
- return host2NonSubscriptionChannels.addChannel(host, newHChannel);
- }
-
- /**
- * Is there a {@link HChannel} existed for a given host.
- *
- * @param host
- * Target host address.
- */
- private HChannel getNonSubscriptionChannel(InetSocketAddress host) {
- return host2NonSubscriptionChannels.getChannel(host);
- }
-
- /**
- * Get a non-subscription channel for a given <code>topic</code>.
- *
- * @param topic
- * Topic Name
- * @return if <code>topic</code>'s owner is unknown, return null.
- * if <code>topic</code>'s owner is know and there is a channel
- * existed before, return the existed channel, otherwise created
- * a new one.
- */
- private HChannel getNonSubscriptionChannelByTopic(ByteString topic) {
- InetSocketAddress host = topic2Host.get(topic);
- if (null == host) {
- // we don't know where is the topic
- return null;
- } else {
- // we had know which server owned the topic
- HChannel channel = getNonSubscriptionChannel(host);
- if (null == channel) {
- // create a channel to connect to specified host
- channel = createAndStoreNonSubscriptionChannel(host);
- }
- return channel;
- }
- }
-
- /**
- * Handle the disconnected event from a non-subscription {@link HChannel}.
- *
- * @param host
- * Which host is disconnected.
- * @param channel
- * The underlying established channel.
- */
- protected void onNonSubscriptionChannelDisconnected(InetSocketAddress host,
- Channel channel) {
- // Only remove the Channel from the mapping if this current
- // disconnected channel is the same as the cached entry.
- // Due to race concurrency situations, it is possible to
- // create multiple channels to the same host for publish
- // and unsubscribe requests.
- HChannel hChannel = host2NonSubscriptionChannels.getChannel(host);
- if (null == hChannel) {
- return;
- }
- Channel underlyingChannel = hChannel.getChannel();
- if (null == underlyingChannel ||
- !underlyingChannel.equals(channel)) {
- return;
- }
- logger.info("NonSubscription Channel {} to {} disconnected.",
- va(channel, host));
- // remove existed channel
- if (host2NonSubscriptionChannels.removeChannel(host, hChannel)) {
- clearAllTopicsForHost(host);
- }
- }
-
- /**
- * Create and store a subscription {@link HChannel} thru the underlying established
- * <code>channel</code>
- *
- * @param channel
- * The underlying established subscription channel.
- */
- protected abstract HChannel createAndStoreSubscriptionChannel(Channel channel);
-
- /**
- * Create and store a subscription {@link HChannel} to target host.
- *
- * @param host
- * Target host address.
- */
- protected abstract HChannel createAndStoreSubscriptionChannel(InetSocketAddress host);
-
- /**
- * Is there a subscription {@link HChannel} existed for a given host.
- *
- * @param host
- * Target host address.
- */
- protected abstract HChannel getSubscriptionChannel(InetSocketAddress host);
-
- /**
- * Get a subscription channel for a given <code>topicSubscriber</code>.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @return if <code>topic</code>'s owner is unknown, return null.
- * if <code>topic</code>'s owner is know and there is a channel
- * existed before, return the existed channel, otherwise created
- * a new one for the <code>topicSubscriber</code>.
- */
- protected abstract HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber topicSubscriber);
-
- /**
- * Handle the disconnected event from a subscription {@link HChannel}.
- *
- * @param host
- * Which host is disconnected.
- * @param channel
- * The underlying established channel.
- */
- protected abstract void onSubscriptionChannelDisconnected(InetSocketAddress host,
- Channel channel);
-
- private void sendConsumeRequest(final TopicSubscriber topicSubscriber,
- final MessageSeqId messageSeqId,
- final Channel channel) {
- PubSubRequest.Builder pubsubRequestBuilder =
- NetUtils.buildConsumeRequest(nextTxnId(), topicSubscriber, messageSeqId);
-
- // For Consume requests, we will send them from the client in a fire and
- // forget manner. We are not expecting the server to send back an ack
- // response so no need to register this in the ResponseHandler. There
- // are no callbacks to invoke since this isn't a client initiated
- // action. Instead, just have a future listener that will log an error
- // message if there was a problem writing the consume request.
- logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}",
- va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber));
- ChannelFuture future = channel.write(pubsubRequestBuilder.build());
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}",
- va(NetUtils.getHostFromChannel(channel),
- messageSeqId, topicSubscriber));
- }
- }
- });
- }
-
- /**
- * Helper method to store the topic2Host mapping in the channel manager cache
- * map. This method is assumed to be called when we've done a successful
- * connection to the correct server topic master.
- *
- * @param topic
- * Topic Name
- * @param host
- * Host Address
- */
- protected void storeTopic2HostMapping(ByteString topic, InetSocketAddress host) {
- InetSocketAddress oldHost = topic2Host.putIfAbsent(topic, host);
- if (null != oldHost && oldHost.equals(host)) {
- // Entry in map exists for the topic but it is the same as the
- // current host. In this case there is nothing to do.
- return;
- }
-
- if (null != oldHost) {
- if (topic2Host.replace(topic, oldHost, host)) {
- // Store the relevant mappings for this topic and host combination.
- logger.debug("Storing info for topic: {}, old host: {}, new host: {}.",
- va(topic.toStringUtf8(), oldHost, host));
- clearHostForTopic(topic, oldHost);
- } else {
- logger.warn("Ownership of topic: {} has been changed from {} to {} when storeing host: {}",
- va(topic.toStringUtf8(), oldHost, topic2Host.get(topic), host));
- return;
- }
- } else {
- logger.debug("Storing info for topic: {}, host: {}.",
- va(topic.toStringUtf8(), host));
- }
- Set<ByteString> topicsForHost = host2Topics.get(host);
- if (null == topicsForHost) {
- Set<ByteString> newTopicsSet = new HashSet<ByteString>();
- topicsForHost = host2Topics.putIfAbsent(host, newTopicsSet);
- if (null == topicsForHost) {
- topicsForHost = newTopicsSet;
- }
- }
- synchronized (topicsForHost) {
- // check whether the ownership changed, since it might happened
- // after replace succeed
- if (host.equals(topic2Host.get(topic))) {
- topicsForHost.add(topic);
- }
- }
- }
-
- // If a server host goes down or the channel to it gets disconnected,
- // we want to clear out all relevant cached information. We'll
- // need to remove all of the topic mappings that the host was
- // responsible for.
- protected void clearAllTopicsForHost(InetSocketAddress host) {
- logger.debug("Clearing all topics for host: {}", host);
- // For each of the topics that the host was responsible for,
- // remove it from the topic2Host mapping.
- Set<ByteString> topicsForHost = host2Topics.get(host);
- if (null != topicsForHost) {
- synchronized (topicsForHost) {
- for (ByteString topic : topicsForHost) {
- logger.debug("Removing mapping for topic: {} from host: {}.",
- va(topic.toStringUtf8(), host));
- topic2Host.remove(topic, host);
- }
- }
- // Now it is safe to remove the host2Topics mapping entry.
- host2Topics.remove(host, topicsForHost);
- }
- }
-
- // If a subscribe channel goes down, the topic might have moved.
- // We only clear out that topic for the host and not all cached information.
- public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
- logger.debug("Clearing topic: {} from host: {}.",
- va(topic.toStringUtf8(), host));
- if (topic2Host.remove(topic, host)) {
- logger.debug("Removed topic to host mapping for topic: {} and host: {}.",
- va(topic.toStringUtf8(), host));
- }
- Set<ByteString> topicsForHost = host2Topics.get(host);
- if (null != topicsForHost) {
- boolean removed;
- synchronized (topicsForHost) {
- removed = topicsForHost.remove(topic);
- }
- if (removed) {
- logger.debug("Removed topic: {} from host: {}.",
- topic.toStringUtf8(), host);
- if (topicsForHost.isEmpty()) {
- // remove only topic list is empty
- host2Topics.remove(host, EMPTY_TOPIC_SET);
- }
- }
- }
- }
-
- @Override
- public long nextTxnId() {
- return globalCounter.incrementAndGet();
- }
-
- // We need to deal with the possible problem of a PubSub request being
- // written to successfully to the server host but for some reason, the
- // ack message back never comes. What could happen is that the VoidCallback
- // stored in the ResponseHandler.txn2PublishData map will never be called.
- // We should have a configured timeout so if that passes from the time a
- // write was successfully done to the server, we can fail this async PubSub
- // transaction. The caller could possibly redo the transaction if needed at
- // a later time. Creating a timeout cleaner TimerTask to do this here.
- class PubSubRequestTimeoutTask extends TimerTask {
- /**
- * Implement the TimerTask's abstract run method.
- */
- @Override
- public void run() {
- if (isClosed()) {
- return;
- }
- logger.debug("Running the PubSubRequest Timeout Task");
- // First check those non-subscription channels
- for (HChannel channel : host2NonSubscriptionChannels.getChannels()) {
- try {
- HChannelHandler channelHandler =
- HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
- channelHandler.checkTimeoutRequests();
- } catch (NoResponseHandlerException nrhe) {
- continue;
- }
- }
- // Then check those subscription channels
- checkTimeoutRequestsOnSubscriptionChannels();
- }
- }
-
- protected abstract void restartDelivery(TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException;
-
- /**
- * Chekout the pub/sub requests on subscription channels.
- */
- protected abstract void checkTimeoutRequestsOnSubscriptionChannels();
-
- @Override
- public boolean isClosed() {
- closedLock.readLock().lock();
- try {
- return closed;
- } finally {
- closedLock.readLock().unlock();
- }
- }
-
- /**
- * Close all subscription channels when close channel manager.
- */
- protected abstract void closeSubscriptionChannels();
-
- @Override
- public void close() {
- logger.info("Shutting down the channels manager.");
- closedLock.writeLock().lock();
- try {
- // Not first time to close
- if (closed) {
- return;
- }
- closed = true;
- } finally {
- closedLock.writeLock().unlock();
- }
- clientTimer.cancel();
- // Clear all existed channels
- host2NonSubscriptionChannels.close();
-
- // clear all subscription channels
- closeSubscriptionChannels();
-
- // Clear out all Maps
- topic2Host.clear();
- host2Topics.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
deleted file mode 100644
index 7fcfc44..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.protobuf.ByteString;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.util.Either;
-import static org.apache.hedwig.util.VarArgs.va;
-
-public abstract class AbstractSubscribeResponseHandler extends SubscribeResponseHandler {
-
- private static final Logger logger =
- LoggerFactory.getLogger(AbstractSubscribeResponseHandler.class);
-
- protected final ReentrantReadWriteLock disconnectLock =
- new ReentrantReadWriteLock();
-
- protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions
- = new ConcurrentHashMap<TopicSubscriber, ActiveSubscriber>();
- protected final AbstractHChannelManager aChannelManager;
-
- protected AbstractSubscribeResponseHandler(ClientConfiguration cfg,
- HChannelManager channelManager) {
- super(cfg, channelManager);
- this.aChannelManager = (AbstractHChannelManager) channelManager;
- }
-
- protected HChannelManager getHChannelManager() {
- return this.channelManager;
- }
-
- protected ClientConfiguration getConfiguration() {
- return cfg;
- }
-
- protected ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
- return subscriptions.get(ts);
- }
-
- protected ActiveSubscriber createActiveSubscriber(
- ClientConfiguration cfg, AbstractHChannelManager channelManager,
- TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences,
- Channel channel, HChannel hChannel) {
- return new ActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel);
- }
-
- @Override
- public void handleResponse(PubSubResponse response, PubSubData pubSubData,
- Channel channel) throws Exception {
- if (logger.isDebugEnabled()) {
- logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.",
- va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
- }
- switch (response.getStatusCode()) {
- case SUCCESS:
- TopicSubscriber ts = new TopicSubscriber(pubSubData.topic,
- pubSubData.subscriberId);
- SubscriptionPreferences preferences = null;
- if (response.hasResponseBody()) {
- ResponseBody respBody = response.getResponseBody();
- if (respBody.hasSubscribeResponse()) {
- SubscribeResponse resp = respBody.getSubscribeResponse();
- if (resp.hasPreferences()) {
- preferences = resp.getPreferences();
- if (logger.isDebugEnabled()) {
- logger.debug("Receive subscription preferences for {} : {}",
- va(ts,
- SubscriptionStateUtils.toString(preferences)));
- }
- }
- }
- }
-
- Either<StatusCode, HChannel> result;
- StatusCode statusCode;
- ActiveSubscriber ss = null;
- // Store the Subscribe state
- disconnectLock.readLock().lock();
- try {
- result = handleSuccessResponse(ts, pubSubData, channel);
- statusCode = result.left();
- if (StatusCode.SUCCESS == statusCode) {
- ss = createActiveSubscriber(
- cfg, aChannelManager, ts, pubSubData, preferences, channel, result.right());
- statusCode = addSubscription(ts, ss);
- }
- } finally {
- disconnectLock.readLock().unlock();
- }
- if (StatusCode.SUCCESS == statusCode) {
- postHandleSuccessResponse(ts, ss);
- // Response was success so invoke the callback's operationFinished
- // method.
- pubSubData.getCallback().operationFinished(pubSubData.context, null);
- } else {
- PubSubException exception = PubSubException.create(statusCode,
- "Client is already subscribed for " + ts);
- pubSubData.getCallback().operationFailed(pubSubData.context, exception);
- }
- break;
- case CLIENT_ALREADY_SUBSCRIBED:
- // For Subscribe requests, the server says that the client is
- // already subscribed to it.
- pubSubData.getCallback().operationFailed(pubSubData.context,
- new ClientAlreadySubscribedException("Client is already subscribed for topic: "
- + pubSubData.topic.toStringUtf8() + ", subscriberId: "
- + pubSubData.subscriberId.toStringUtf8()));
- break;
- case SERVICE_DOWN:
- // Response was service down failure so just invoke the callback's
- // operationFailed method.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a SERVICE_DOWN status"));
- break;
- case NOT_RESPONSIBLE_FOR_TOPIC:
- // Redirect response so we'll need to repost the original Subscribe
- // Request
- handleRedirectResponse(response, pubSubData, channel);
- break;
- default:
- // Consider all other status codes as errors, operation failed
- // cases.
- logger.error("Unexpected error response from server for PubSubResponse: " + response);
- pubSubData.getCallback().operationFailed(pubSubData.context,
- new ServiceDownException("Server responded with a status code of: "
- + response.getStatusCode(),
- PubSubException.create(response.getStatusCode(),
- "Original Exception")));
- break;
- }
- }
-
- /**
- * Handle success response for a specific TopicSubscriber <code>ts</code>. The method
- * is triggered after subscribed successfully.
- *
- * @param ts
- * Topic Subscriber.
- * @param pubSubData
- * Pub/Sub Request data for this subscribe request.
- * @param channel
- * Subscription Channel.
- * @return status code to indicate what happened
- */
- protected abstract Either<StatusCode, HChannel> handleSuccessResponse(
- TopicSubscriber ts, PubSubData pubSubData, Channel channel);
-
- protected void postHandleSuccessResponse(TopicSubscriber ts, ActiveSubscriber ss) {
- // do nothing now
- }
-
- private StatusCode addSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
- ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss);
- if (null != oldSS) {
- return StatusCode.CLIENT_ALREADY_SUBSCRIBED;
- } else {
- return StatusCode.SUCCESS;
- }
- }
-
- @Override
- public void handleSubscribeMessage(PubSubResponse response) {
- Message message = response.getMessage();
- TopicSubscriber ts = new TopicSubscriber(response.getTopic(),
- response.getSubscriberId());
- if (logger.isDebugEnabled()) {
- logger.debug("Handling a Subscribe message in response: {}, {}",
- va(response, ts));
- }
- ActiveSubscriber ss = getActiveSubscriber(ts);
- if (null == ss) {
- logger.error("Subscriber {} is not found receiving its message {}.",
- va(ts, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
- return;
- }
- ss.handleMessage(message);
- }
-
- @Override
- protected void asyncMessageDeliver(TopicSubscriber topicSubscriber,
- Message message) {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- logger.error("Subscriber {} is not found delivering its message {}.",
- va(topicSubscriber,
- MessageIdUtils.msgIdToReadableString(message.getMsgId())));
- return;
- }
- ss.asyncMessageDeliver(message);
- }
-
- @Override
- protected void messageConsumed(TopicSubscriber topicSubscriber,
- Message message) {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- logger.warn("Subscriber {} is not found consumed its message {}.",
- va(topicSubscriber,
- MessageIdUtils.msgIdToReadableString(message.getMsgId())));
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Message has been successfully consumed by the client app : {}, {}",
- va(message, topicSubscriber));
- }
- ss.messageConsumed(message);
- }
-
- @Override
- public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event) {
- TopicSubscriber ts = new TopicSubscriber(topic, subscriberId);
- ActiveSubscriber ss = getActiveSubscriber(ts);
- if (null == ss) {
- logger.warn("No subscription {} found receiving subscription event {}.",
- va(ts, event));
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Received subscription event {} for ({}).",
- va(event, ts));
- }
- processSubscriptionEvent(ss, event);
- }
-
- protected void processSubscriptionEvent(ActiveSubscriber as, SubscriptionEvent event) {
- switch (event) {
- // for all cases we need to resubscribe for the subscription
- case TOPIC_MOVED:
- case SUBSCRIPTION_FORCED_CLOSED:
- resubscribeIfNecessary(as, event);
- break;
- default:
- logger.error("Receive unknown subscription event {} for {}.",
- va(event, as.getTopicSubscriber()));
- }
- }
-
- @Override
- public void startDelivery(final TopicSubscriber topicSubscriber,
- MessageHandler messageHandler)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Start delivering message for {} using message handler {}",
- va(topicSubscriber, messageHandler));
- }
- ss.startDelivery(messageHandler);
- }
-
- @Override
- public void stopDelivery(final TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Stop delivering messages for {}", topicSubscriber);
- }
- ss.stopDelivery();
- }
-
- @Override
- public boolean hasSubscription(TopicSubscriber topicSubscriber) {
- return subscriptions.containsKey(topicSubscriber);
- }
-
- @Override
- public void consume(final TopicSubscriber topicSubscriber,
- final MessageSeqId messageSeqId) {
- ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss) {
- logger.warn("Subscriber {} is not found consuming message {}.",
- va(topicSubscriber,
- MessageIdUtils.msgIdToReadableString(messageSeqId)));
- return;
- }
- ss.consume(messageSeqId);
- }
-
- @Override
- public void onChannelDisconnected(InetSocketAddress host, Channel channel) {
- disconnectLock.writeLock().lock();
- try {
- onDisconnect(host);
- } finally {
- disconnectLock.writeLock().unlock();
- }
- }
-
- private void onDisconnect(InetSocketAddress host) {
- for (ActiveSubscriber ss : subscriptions.values()) {
- onDisconnect(ss, host);
- }
- }
-
- private void onDisconnect(ActiveSubscriber ss, InetSocketAddress host) {
- logger.info("Subscription channel for ({}) is disconnected.", ss);
- resubscribeIfNecessary(ss, SubscriptionEvent.TOPIC_MOVED);
- }
-
- protected boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
- return subscriptions.remove(ts, ss);
- }
-
- protected void resubscribeIfNecessary(ActiveSubscriber ss, SubscriptionEvent event) {
- // if subscriber has been changed, we don't need to resubscribe
- if (!removeSubscription(ss.getTopicSubscriber(), ss)) {
- return;
- }
- ss.resubscribeIfNecessary(event);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
deleted file mode 100644
index 10506d8..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import static org.apache.hedwig.util.VarArgs.va;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.MessageConsumeData;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.FilterableMessageHandler;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * an active subscriber handles subscription actions in a channel.
- */
-public class ActiveSubscriber {
-
- private static final Logger logger = LoggerFactory.getLogger(ActiveSubscriber.class);
-
- protected final ClientConfiguration cfg;
- protected final AbstractHChannelManager channelManager;
-
- // Subscriber related variables
- protected final TopicSubscriber topicSubscriber;
- protected final PubSubData op;
- protected final SubscriptionPreferences preferences;
-
- // the underlying netty channel to send request
- protected final Channel channel;
- protected final HChannel hChannel;
-
- // Counter for the number of consumed messages so far to buffer up before we
- // send the Consume message back to the server along with the last/largest
- // message seq ID seen so far in that batch.
- private int numConsumedMessagesInBuffer = 0;
- private MessageSeqId lastMessageSeqId = null;
-
- // Message Handler
- private MessageHandler msgHandler = null;
-
- // Queue used for subscribes when the MessageHandler hasn't been registered
- // yet but we've already received subscription messages from the server.
- // This will be lazily created as needed.
- private final Queue<Message> msgQueue = new LinkedList<Message>();
-
- /**
- * Construct an active subscriber instance.
- *
- * @param cfg
- * Client configuration object.
- * @param channelManager
- * Channel manager instance.
- * @param ts
- * Topic subscriber.
- * @param op
- * Pub/Sub request.
- * @param preferences
- * Subscription preferences for the subscriber.
- * @param channel
- * Netty channel the subscriber lived.
- */
- public ActiveSubscriber(ClientConfiguration cfg,
- AbstractHChannelManager channelManager,
- TopicSubscriber ts, PubSubData op,
- SubscriptionPreferences preferences,
- Channel channel,
- HChannel hChannel) {
- this.cfg = cfg;
- this.channelManager = channelManager;
- this.topicSubscriber = ts;
- this.op = op;
- this.preferences = preferences;
- this.channel = channel;
- this.hChannel = hChannel;
- }
-
- /**
- * @return pub/sub request for the subscription.
- */
- public PubSubData getPubSubData() {
- return this.op;
- }
-
- /**
- * @return topic subscriber id for the active subscriber.
- */
- public TopicSubscriber getTopicSubscriber() {
- return this.topicSubscriber;
- }
-
- /**
- * Start delivering messages using given message handler.
- *
- * @param messageHandler
- * Message handler to deliver messages
- * @throws AlreadyStartDeliveryException if someone already started delivery.
- * @throws ClientNotSubscribedException when start delivery before subscribe.
- */
- public synchronized void startDelivery(MessageHandler messageHandler)
- throws AlreadyStartDeliveryException, ClientNotSubscribedException {
- if (null != this.msgHandler) {
- throw new AlreadyStartDeliveryException("A message handler " + msgHandler
- + " has been started for " + topicSubscriber);
- }
- if (null != messageHandler && messageHandler instanceof FilterableMessageHandler) {
- FilterableMessageHandler filterMsgHandler =
- (FilterableMessageHandler) messageHandler;
- if (filterMsgHandler.hasMessageFilter()) {
- if (null == preferences) {
- // no preferences means talking to an old version hub server
- logger.warn("Start delivering messages with filter but no subscription "
- + "preferences found. It might due to talking to an old version"
- + " hub server.");
- // use the original message handler.
- messageHandler = filterMsgHandler.getMessageHandler();
- } else {
- // pass subscription preferences to message filter
- if (logger.isDebugEnabled()) {
- logger.debug("Start delivering messages with filter on {}, preferences: {}",
- va(topicSubscriber,
- SubscriptionStateUtils.toString(preferences)));
- }
- ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
- msgFilter.setSubscriptionPreferences(topicSubscriber.getTopic(),
- topicSubscriber.getSubscriberId(),
- preferences);
- }
- }
- }
-
- this.msgHandler = messageHandler;
- // Once the MessageHandler is registered, see if we have any queued up
- // subscription messages sent to us already from the server. If so,
- // consume those first. Do this only if the MessageHandler registered is
- // not null (since that would be the HedwigSubscriber.stopDelivery
- // call).
- if (null == msgHandler) {
- return;
- }
- if (msgQueue.size() > 0) {
- if (logger.isDebugEnabled()) {
- logger.debug("Consuming {} queued up messages for {}",
- va(msgQueue.size(), topicSubscriber));
- }
- for (Message message : msgQueue) {
- asyncMessageDeliver(message);
- }
- // Now we can remove the queued up messages since they are all
- // consumed.
- msgQueue.clear();
- }
- }
-
- /**
- * Stop delivering messages to the subscriber.
- */
- public synchronized void stopDelivery() {
- this.msgHandler = null;
- }
-
- /**
- * Handle received message.
- *
- * @param message
- * Received message.
- */
- public synchronized void handleMessage(Message message) {
- if (null != msgHandler) {
- asyncMessageDeliver(message);
- } else {
- // MessageHandler has not yet been registered so queue up these
- // messages for the Topic Subscription. Make the initial lazy
- // creation of the message queue thread safe just so we don't
- // run into a race condition where two simultaneous threads process
- // a received message and both try to create a new instance of
- // the message queue. Performance overhead should be okay
- // because the delivery of the topic has not even started yet
- // so these messages are not consumed and just buffered up here.
- if (logger.isDebugEnabled()) {
- logger.debug("Message {} has arrived but no MessageHandler provided for {}"
- + " yet so queueing up the message.",
- va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
- topicSubscriber));
- }
- msgQueue.add(message);
- }
- }
-
- /**
- * Deliver message to the client.
- *
- * @param message
- * Message to deliver.
- */
- public synchronized void asyncMessageDeliver(Message message) {
- if (null == msgHandler) {
- logger.error("No message handler found to deliver message {} to {}.",
- va(MessageIdUtils.msgIdToReadableString(message.getMsgId()),
- topicSubscriber));
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}",
- va(message, topicSubscriber));
- }
- unsafeDeliverMessage(message);
- }
-
- /**
- * Unsafe version to deliver message to a message handler.
- * Caller need to handle synchronization issue.
- *
- * @param message
- * Message to deliver.
- */
- protected void unsafeDeliverMessage(Message message) {
- MessageConsumeData messageConsumeData =
- new MessageConsumeData(topicSubscriber, message);
- msgHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
- message, channelManager.getConsumeCallback(),
- messageConsumeData);
- }
-
- private synchronized boolean updateLastMessageSeqId(MessageSeqId seqId) {
- if (null != lastMessageSeqId &&
- seqId.getLocalComponent() <= lastMessageSeqId.getLocalComponent()) {
- return false;
- }
- ++numConsumedMessagesInBuffer;
- lastMessageSeqId = seqId;
- if (numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) {
- numConsumedMessagesInBuffer = 0;
- lastMessageSeqId = null;
- return true;
- }
- return false;
- }
-
- /**
- * Consume a specific message.
- *
- * @param messageSeqId
- * Message seq id.
- */
- public void consume(final MessageSeqId messageSeqId) {
- PubSubRequest.Builder pubsubRequestBuilder =
- NetUtils.buildConsumeRequest(channelManager.nextTxnId(),
- topicSubscriber, messageSeqId);
-
- // For Consume requests, we will send them from the client in a fire and
- // forget manner. We are not expecting the server to send back an ack
- // response so no need to register this in the ResponseHandler. There
- // are no callbacks to invoke since this isn't a client initiated
- // action. Instead, just have a future listener that will log an error
- // message if there was a problem writing the consume request.
- if (logger.isDebugEnabled()) {
- logger.debug("Writing a Consume request to channel: {} with messageSeqId: {} for {}",
- va(channel, messageSeqId, topicSubscriber));
- }
- ChannelFuture future = channel.write(pubsubRequestBuilder.build());
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Error writing a Consume request to channel: {} with messageSeqId: {} for {}",
- va(channel, messageSeqId, topicSubscriber));
- }
- }
- });
- }
-
- /**
- * Application acked to consume message.
- *
- * @param message
- * Message consumed by application.
- */
- public void messageConsumed(Message message) {
- // For consume response to server, there is a config param on how many
- // messages to consume and buffer up before sending the consume request.
- // We just need to keep a count of the number of messages consumed
- // and the largest/latest msg ID seen so far in this batch. Messages
- // should be delivered in order and without gaps. Do this only if
- // auto-sending of consume messages is enabled.
- if (cfg.isAutoSendConsumeMessageEnabled()) {
- // Update these variables only if we are auto-sending consume
- // messages to the server. Otherwise the onus is on the client app
- // to call the Subscriber consume API to let the server know which
- // messages it has successfully consumed.
- if (updateLastMessageSeqId(message.getMsgId())) {
- // Send the consume request and reset the consumed messages buffer
- // variables. We will use the same Channel created from the
- // subscribe request for the TopicSubscriber.
- if (logger.isDebugEnabled()) {
- logger.debug("Consume message {} when reaching consumed message buffer limit.",
- message.getMsgId());
- }
- consume(message.getMsgId());
- }
- }
- }
-
- /**
- * Resubscribe a subscriber if necessary.
- *
- * @param event
- * Subscription Event.
- */
- public void resubscribeIfNecessary(SubscriptionEvent event) {
- // clear topic ownership
- if (SubscriptionEvent.TOPIC_MOVED == event) {
- channelManager.clearHostForTopic(topicSubscriber.getTopic(),
- NetUtils.getHostFromChannel(channel));
- }
- if (!op.options.getEnableResubscribe()) {
- channelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(
- topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), event);
- return;
- }
- // Since the connection to the server host that was responsible
- // for the topic died, we are not sure about the state of that
- // server. Resend the original subscribe request data to the default
- // server host/VIP. Also clear out all of the servers we've
- // contacted or attempted to from this request as we are starting a
- // "fresh" subscribe request.
- op.clearServersList();
- // Set a new type of VoidCallback for this async call. We need this
- // hook so after the resubscribe has completed, delivery for
- // that topic subscriber should also be restarted (if it was that
- // case before the channel disconnect).
- final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime();
- ResubscribeCallback resubscribeCb =
- new ResubscribeCallback(topicSubscriber, op,
- channelManager, retryWaitTime);
- op.setCallback(resubscribeCb);
- op.shouldClaim = false;
- op.context = null;
- op.setOriginalChannelForResubscribe(hChannel);
- if (logger.isDebugEnabled()) {
- logger.debug("Resubscribe {} with origSubData {}",
- va(topicSubscriber, op));
- }
- // resubmit the request
- channelManager.submitOp(op);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java
deleted file mode 100644
index ab86f23..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.util.Map;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-public abstract class ClientChannelPipelineFactory implements ChannelPipelineFactory {
-
- protected ClientConfiguration cfg;
- protected AbstractHChannelManager channelManager;
-
- public ClientChannelPipelineFactory(ClientConfiguration cfg,
- AbstractHChannelManager channelManager) {
- this.cfg = cfg;
- this.channelManager = channelManager;
- }
-
- protected abstract Map<OperationType, AbstractResponseHandler> createResponseHandlers();
-
- private HChannelHandler createHChannelHandler() {
- return new HChannelHandler(cfg, channelManager, createResponseHandlers());
- }
-
- // Retrieve a ChannelPipeline from the factory.
- public ChannelPipeline getPipeline() throws Exception {
- // Create a new ChannelPipline using the factory method from the
- // Channels helper class.
- ChannelPipeline pipeline = Channels.pipeline();
- if (channelManager.getSslFactory() != null) {
- pipeline.addLast("ssl", new SslHandler(channelManager.getSslFactory().getEngine()));
- }
- pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(
- cfg.getMaximumMessageSize(), 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-
- pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubResponse.getDefaultInstance()));
- pipeline.addLast("protobufencoder", new ProtobufEncoder());
-
- pipeline.addLast("responsehandler", createHChannelHandler());
- return pipeline;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
deleted file mode 100644
index 065b2f7..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import static org.apache.hedwig.util.VarArgs.va;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handle requests sent to default hub server. <b>DefaultServerChannel</b> would not
- * be used as a channel to send requests directly. It just takes the responsibility to
- * connect to the default server. After the underlying netty channel is established,
- * it would call {@link HChannelManager#submitOpThruChannel()} to send requests thru
- * the underlying netty channel.
- */
-class DefaultServerChannel extends HChannelImpl {
-
- private static final Logger logger = LoggerFactory.getLogger(DefaultServerChannel.class);
-
- DefaultServerChannel(InetSocketAddress host, AbstractHChannelManager channelManager) {
- super(host, channelManager);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("[DefaultServer: ").append(host).append("]");
- return sb.toString();
- }
-
- @Override
- public void submitOp(final PubSubData pubSubData) {
- // for each pub/sub request sent to default hub server
- // we would establish a fresh connection for it
- ClientChannelPipelineFactory pipelineFactory;
- if (OperationType.PUBLISH.equals(pubSubData.operationType) ||
- OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
- pipelineFactory = channelManager.getNonSubscriptionChannelPipelineFactory();
- } else {
- pipelineFactory = channelManager.getSubscriptionChannelPipelineFactory();
- }
- ChannelFuture future = connect(host, pipelineFactory);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- // If the channel has been closed, there is no need to proceed with any callback
- // logic here.
- if (closed) {
- future.getChannel().close();
- return;
- }
-
- // Check if the connection to the server was done successfully.
- if (!future.isSuccess()) {
- logger.error("Error connecting to host {}.", host);
- future.getChannel().close();
-
- retryOrFailOp(pubSubData);
- // Finished with failure logic so just return.
- return;
- }
- logger.debug("Connected to host {} for pubSubData: {}",
- va(host, pubSubData));
- channelManager.submitOpThruChannel(pubSubData, future.getChannel());
- }
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
deleted file mode 100644
index c41a329..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
-import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse;
-import static org.apache.hedwig.util.VarArgs.va;
-
-public class HChannelHandler extends SimpleChannelHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(HChannelHandler.class);
-
- // Concurrent Map to store for each async PubSub request, the txn ID
- // and the corresponding PubSub call's data which stores the VoidCallback to
- // invoke when we receive a PubSub ack response from the server.
- // This is specific to this instance of the HChannelHandler which is
- // tied to a specific netty Channel Pipeline.
- private final ConcurrentMap<Long, PubSubData> txn2PubSubData =
- new ConcurrentHashMap<Long, PubSubData>();
-
- // Boolean indicating if we closed the channel this HChannelHandler is
- // attached to explicitly or not. If so, we do not need to do the
- // channel disconnected logic here.
- private volatile boolean channelClosedExplicitly = false;
-
- private final AbstractHChannelManager channelManager;
- private final ClientConfiguration cfg;
-
- private final Map<OperationType, AbstractResponseHandler> handlers;
- private final SubscribeResponseHandler subHandler;
-
- public HChannelHandler(ClientConfiguration cfg,
- AbstractHChannelManager channelManager,
- Map<OperationType, AbstractResponseHandler> handlers) {
- this.cfg = cfg;
- this.channelManager = channelManager;
- this.handlers = handlers;
- subHandler = (SubscribeResponseHandler) handlers.get(OperationType.SUBSCRIBE);
- }
-
- public SubscribeResponseHandler getSubscribeResponseHandler() {
- return subHandler;
- }
-
- public void removeTxn(long txnId) {
- txn2PubSubData.remove(txnId);
- }
-
- public void addTxn(long txnId, PubSubData pubSubData) {
- txn2PubSubData.put(txnId, pubSubData);
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- // If the Message is not a PubSubResponse, just send it upstream and let
- // something else handle it.
- if (!(e.getMessage() instanceof PubSubResponse)) {
- ctx.sendUpstream(e);
- return;
- }
- // Retrieve the PubSubResponse from the Message that was sent by the
- // server.
- PubSubResponse response = (PubSubResponse) e.getMessage();
- logger.debug("Response received from host: {}, response: {}.",
- va(NetUtils.getHostFromChannel(ctx.getChannel()), response));
-
- // Determine if this PubSubResponse is an ack response for a PubSub
- // Request or if it is a message being pushed to the client subscriber.
- if (response.hasMessage()) {
- // Subscribed messages being pushed to the client so handle/consume
- // it and return.
- if (null == subHandler) {
- logger.error("Received message from a non-subscription channel : {}",
- response);
- } else {
- subHandler.handleSubscribeMessage(response);
- }
- return;
- }
-
- // Process Subscription Events
- if (response.hasResponseBody()) {
- ResponseBody resp = response.getResponseBody();
- // A special subscription event indicates the state of a subscriber
- if (resp.hasSubscriptionEvent()) {
- if (null == subHandler) {
- logger.error("Received subscription event from a non-subscription channel : {}",
- response);
- } else {
- SubscriptionEventResponse eventResp = resp.getSubscriptionEvent();
- logger.debug("Received subscription event {} for (topic:{}, subscriber:{}).",
- va(eventResp.getEvent(), response.getTopic(),
- response.getSubscriberId()));
- subHandler.handleSubscriptionEvent(response.getTopic(),
- response.getSubscriberId(),
- eventResp.getEvent());
- }
- return;
- }
- }
-
- // Response is an ack to a prior PubSubRequest so first retrieve the
- // PubSub data for this txn.
- PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId());
-
- // Validate that the PubSub data for this txn is stored. If not, just
- // log an error message and return since we don't know how to handle
- // this.
- if (pubSubData == null) {
- logger.error("PubSub Data was not found for PubSubResponse: {}", response);
- return;
- }
-
- // Store the topic2Host mapping if this wasn't a server redirect. We'll
- // assume that if the server was able to have an open Channel connection
- // to the client, and responded with an ack message other than the
- // NOT_RESPONSIBLE_FOR_TOPIC one, it is the correct topic master.
- if (!response.getStatusCode().equals(StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
- // Retrieve the server host that we've connected to and store the
- // mapping from the topic to this host. For all other non-redirected
- // server statuses, we consider that as a successful connection to the
- // correct topic master.
- InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
- channelManager.storeTopic2HostMapping(pubSubData.topic, host);
- }
-
- // Depending on the operation type, call the appropriate handler.
- logger.debug("Handling a {} response: {}, pubSubData: {}, host: {}.",
- va(pubSubData.operationType, response, pubSubData, ctx.getChannel()));
- AbstractResponseHandler respHandler = handlers.get(pubSubData.operationType);
- if (null == respHandler) {
- // The above are the only expected PubSubResponse messages received
- // from the server for the various client side requests made.
- logger.error("Response received from server is for an unhandled operation {}, txnId: {}.",
- va(pubSubData.operationType, response.getTxnId()));
- pubSubData.getCallback().operationFailed(pubSubData.context,
- new UnexpectedConditionException("Can't find response handler for operation "
- + pubSubData.operationType));
- return;
- }
- respHandler.handleResponse(response, pubSubData, ctx.getChannel());
- }
-
- public void checkTimeoutRequests() {
- long curTime = System.currentTimeMillis();
- long timeoutInterval = cfg.getServerAckResponseTimeout();
- for (PubSubData pubSubData : txn2PubSubData.values()) {
- checkTimeoutRequest(pubSubData, curTime, timeoutInterval);
- }
- }
-
- private void checkTimeoutRequest(PubSubData pubSubData,
- long curTime, long timeoutInterval) {
- if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
- // Current PubSubRequest has timed out so remove it from the
- // ResponseHandler's map and invoke the VoidCallback's
- // operationFailed method.
- logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
- txn2PubSubData.remove(pubSubData.txnId);
- pubSubData.getCallback().operationFailed(pubSubData.context,
- new UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
- }
- }
-
- // Logic to deal with what happens when a Channel to a server host is
- // disconnected.
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- // If this channel was closed explicitly by the client code,
- // we do not need to do any of this logic. This could happen
- // for redundant Publish channels created or redirected subscribe
- // channels that are not used anymore or when we shutdown the
- // client and manually close all of the open channels.
- // Also don't do any of the disconnect logic if the client has stopped.
- if (channelClosedExplicitly || channelManager.isClosed()) {
- return;
- }
-
- // Make sure the host retrieved is not null as there could be some weird
- // channel disconnect events happening during a client shutdown.
- // If it is, just return as there shouldn't be anything we need to do.
- InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
- if (host == null) {
- return;
- }
-
- logger.info("Channel {} was disconnected to host {}.",
- va(ctx.getChannel(), host));
-
- // If this Channel was used for Publish and Unsubscribe flows, just
- // remove it from the HewdigPublisher's host2Channel map. We will
- // re-establish a Channel connection to that server when the next
- // publish/unsubscribe request to a topic that the server owns occurs.
-
- // Now determine what type of operation this channel was used for.
- if (null == subHandler) {
- channelManager.onNonSubscriptionChannelDisconnected(host, ctx.getChannel());
- } else {
- channelManager.onSubscriptionChannelDisconnected(host, ctx.getChannel());
- }
-
- // Finally, all of the PubSubRequests that are still waiting for an ack
- // response from the server need to be removed and timed out. Invoke the
- // operationFailed callbacks on all of them. Use the
- // UncertainStateException since the server did receive the request but
- // we're not sure of the state of the request since the ack response was
- // never received.
- for (PubSubData pubSubData : txn2PubSubData.values()) {
- logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: {}",
- pubSubData);
- pubSubData.getCallback().operationFailed(pubSubData.context, new UncertainStateException(
- "Server ack response never received before server connection disconnected!"));
- }
- txn2PubSubData.clear();
- }
-
- // Logic to deal with what happens when a Channel to a server host is
- // connected. This is needed if the client is using an SSL port to
- // communicate with the server. If so, we need to do the SSL handshake here
- // when the channel is first connected.
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- // No need to initiate the SSL handshake if we are closing this channel
- // explicitly or the client has been stopped.
- if (cfg.isSSLEnabled() && !channelClosedExplicitly && !channelManager.isClosed()) {
- logger.debug("Initiating the SSL handshake");
- ctx.getPipeline().get(SslHandler.class).handshake();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- logger.error("Exception caught on client channel", e.getCause());
- e.getChannel().close();
- }
-
- public void closeExplicitly() {
- // TODO: BOOKKEEPER-350 : Handle consume buffering, etc here - in a different patch
- channelClosedExplicitly = true;
- }
-}