You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:34 UTC
[24/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
deleted file mode 100644
index 6b2a965..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import java.net.InetSocketAddress;
-
-
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.util.Callback;
-
-/**
- * An interface provided to manage all subscriptions on a channel.
- *
- * Its responsibility is to handle all subscribe responses received on that channel,
- * clear up subscriptions and retry reconnectin subscriptions when channel disconnected,
- * and handle delivering messages to {@link MessageHandler} and sent consume messages
- * back to hub servers.
- */
-public abstract class SubscribeResponseHandler extends AbstractResponseHandler {
-
- protected SubscribeResponseHandler(ClientConfiguration cfg,
- HChannelManager channelManager) {
- super(cfg, channelManager);
- }
-
- /**
- * Handle Message delivered by the server.
- *
- * @param response
- * Message received from the server.
- */
- public abstract void handleSubscribeMessage(PubSubResponse response);
-
- /**
- * Handle a subscription event delivered by the server.
- *
- * @param topic
- * Topic Name
- * @param subscriberId
- * Subscriber Id
- * @param event
- * Subscription Event describes its status
- */
- public abstract void handleSubscriptionEvent(ByteString topic,
- ByteString subscriberId,
- SubscriptionEvent event);
-
- /**
- * Method called when a message arrives for a subscribe Channel and we want
- * to deliver it asynchronously via the registered MessageHandler (should
- * not be null when called here).
- *
- * @param message
- * Message from Subscribe Channel we want to consume.
- */
- protected abstract void asyncMessageDeliver(TopicSubscriber topicSubscriber,
- Message message);
-
- /**
- * Method called when the client app's MessageHandler has asynchronously
- * completed consuming a subscribed message sent from the server. The
- * contract with the client app is that messages sent to the handler to be
- * consumed will have the callback response done in the same order. So if we
- * asynchronously call the MessageHandler to consume messages #1-5, that
- * should call the messageConsumed method here via the VoidCallback in the
- * same order. To make this thread safe, since multiple outstanding messages
- * could be consumed by the client app and then called back to here, make
- * this method synchronized.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @param message
- * Message sent from server for topic subscription that has been
- * consumed by the client.
- */
- protected abstract void messageConsumed(TopicSubscriber topicSubscriber,
- Message message);
-
- /**
- * Start delivering messages for a given topic subscriber.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @param messageHandler
- * MessageHandler to register for this ResponseHandler instance.
- * @throws ClientNotSubscribedException
- * If the client is not currently subscribed to the topic
- * @throws AlreadyStartDeliveryException
- * If someone started delivery a message handler before stopping existed one.
- */
- public abstract void startDelivery(TopicSubscriber topicSubscriber,
- MessageHandler messageHandler)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException;
-
- /**
- * Stop delivering messages for a given topic subscriber.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @throws ClientNotSubscribedException
- * If the client is not currently subscribed to the topic
- */
- public abstract void stopDelivery(TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException;
-
- /**
- * Whether the given topic subscriber subscribed thru this handler.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @return whether the given topic subscriber subscribed thru this handler.
- */
- public abstract boolean hasSubscription(TopicSubscriber topicSubscriber);
-
- /**
- * Close subscription from this handler.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @param callback
- * Callback when the subscription is closed.
- * @param context
- * Callback context.
- */
- public abstract void asyncCloseSubscription(TopicSubscriber topicSubscriber,
- Callback<ResponseBody> callback,
- Object context);
-
- /**
- * Consume a given message for given topic subscriber thru this handler.
- *
- * @param topicSubscriber
- * Topic Subscriber
- */
- public abstract void consume(TopicSubscriber topicSubscriber,
- MessageSeqId messageSeqId);
-
- /**
- * This method is called when the underlying channel is disconnected due to server failure.
- *
- * The implementation should take the responsibility to clear subscriptions and retry
- * reconnecting subscriptions to new hub servers.
- *
- * @param host
- * Host that channel connected to has disconnected.
- * @param channel
- * Channel connected to.
- */
- public abstract void onChannelDisconnected(InetSocketAddress host,
- Channel channel);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
deleted file mode 100644
index 0d3e8a6..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-
-public class UnsubscribeResponseHandler extends AbstractResponseHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(UnsubscribeResponseHandler.class);
-
- public UnsubscribeResponseHandler(ClientConfiguration cfg,
- HChannelManager channelManager) {
- super(cfg, channelManager);
- }
-
- @Override
- public void handleResponse(final PubSubResponse response, final PubSubData pubSubData,
- final Channel channel)
- throws Exception {
- switch (response.getStatusCode()) {
- case SUCCESS:
- // since for unsubscribe request, we close subscription first
- // for now, we don't need to do anything now.
- pubSubData.getCallback().operationFinished(pubSubData.context, null);
- break;
- case CLIENT_NOT_SUBSCRIBED:
- // For Unsubscribe requests, the server says that the client was
- // never subscribed to the topic.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ClientNotSubscribedException(
- "Client was never subscribed to topic: " +
- pubSubData.topic.toStringUtf8() + ", subscriberId: " +
- pubSubData.subscriberId.toStringUtf8()));
- break;
- case SERVICE_DOWN:
- // Response was service down failure so just invoke the callback's
- // operationFailed method.
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a SERVICE_DOWN status"));
- break;
- case NOT_RESPONSIBLE_FOR_TOPIC:
- // Redirect response so we'll need to repost the original
- // Unsubscribe Request
- handleRedirectResponse(response, pubSubData, channel);
- break;
- default:
- // Consider all other status codes as errors, operation failed
- // cases.
- logger.error("Unexpected error response from server for PubSubResponse: " + response);
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a status code of: " +
- response.getStatusCode()));
- break;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
deleted file mode 100644
index a6ba2a6..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CleanupChannelMap<T> {
-
- private static final Logger logger = LoggerFactory.getLogger(CleanupChannelMap.class);
-
- private final ConcurrentHashMap<T, HChannel> channels;
-
- // Boolean indicating if the channel map is closed or not.
- protected boolean closed = false;
- protected final ReentrantReadWriteLock closedLock =
- new ReentrantReadWriteLock();
-
- public CleanupChannelMap() {
- channels = new ConcurrentHashMap<T, HChannel>();
- }
-
- /**
- * Add channel to the map. If an old channel has been bound
- * to <code>key</code>, the <code>channel</code> would be
- * closed immediately and the old channel is returned. Otherwise,
- * the <code>channel</code> is put in the map for future usage.
- *
- * If the channel map has been closed, the channel would be closed
- * immediately.
- *
- * @param key
- * Key
- * @param channel
- * Channel
- * @return the channel instance to use.
- */
- public HChannel addChannel(T key, HChannel channel) {
- this.closedLock.readLock().lock();
- try {
- if (closed) {
- channel.close();
- return channel;
- }
- HChannel oldChannel = channels.putIfAbsent(key, channel);
- if (null != oldChannel) {
- logger.info("Channel for {} already exists, so no need to store it.", key);
- channel.close();
- return oldChannel;
- } else {
- logger.debug("Storing a new channel for {}.", key);
- return channel;
- }
- } finally {
- this.closedLock.readLock().unlock();
- }
- }
-
- /**
- * Replace channel only if currently mapped to the given <code>oldChannel</code>.
- *
- * @param key
- * Key
- * @param oldChannel
- * Old Channel
- * @param newChannel
- * New Channel
- * @return true if replaced successfully, otherwise false.
- */
- public boolean replaceChannel(T key, HChannel oldChannel, HChannel newChannel) {
- this.closedLock.readLock().lock();
- try {
- if (closed) {
- if (null != oldChannel) oldChannel.close();
- if (null != newChannel) newChannel.close();
- return false;
- }
- if (null == oldChannel) {
- HChannel existedChannel = channels.putIfAbsent(key, newChannel);
- if (null != existedChannel) {
- logger.info("Channel for {} already exists, so no need to replace it.", key);
- newChannel.close();
- return false;
- } else {
- logger.debug("Storing a new channel for {}.", key);
- return true;
- }
- } else {
- if (channels.replace(key, oldChannel, newChannel)) {
- logger.debug("Replacd channel {} for {}.", oldChannel, key);
- oldChannel.close();
- return true;
- } else {
- newChannel.close();
- return false;
- }
- }
- } finally {
- this.closedLock.readLock().unlock();
- }
- }
-
- /**
- * Returns the channel bound with <code>key</code>.
- *
- * @param key Key
- * @return the channel bound with <code>key</code>.
- */
- public HChannel getChannel(T key) {
- return channels.get(key);
- }
-
- /**
- * Remove the channel bound with <code>key</code>.
- *
- * @param key Key
- * @return the channel bound with <code>key</code>, null if no channel
- * is bound with <code>key</code>.
- */
- public HChannel removeChannel(T key) {
- return channels.remove(key);
- }
-
- /**
- * Remove the channel bound with <code>key</code>.
- *
- * @param key Key
- * @param channel The channel expected to be bound with <code>key</code>.
- * @return true if the channel is removed, false otherwise.
- */
- public boolean removeChannel(T key, HChannel channel) {
- return channels.remove(key, channel);
- }
-
- /**
- * Return the channels in the map.
- *
- * @return the set of channels.
- */
- public Collection<HChannel> getChannels() {
- return channels.values();
- }
-
- /**
- * Close the channels map.
- */
- public void close() {
- closedLock.writeLock().lock();
- try {
- if (closed) {
- return;
- }
- closed = true;
- } finally {
- closedLock.writeLock().unlock();
- }
- logger.debug("Closing channels map.");
- for (HChannel channel : channels.values()) {
- channel.close(true);
- }
- channels.clear();
- logger.debug("Closed channels map.");
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
deleted file mode 100644
index 94e0a80..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Handlers used by a subscription.
- */
-public class FilterableMessageHandler implements MessageHandler {
-
- MessageHandler msgHandler;
- ClientMessageFilter msgFilter;
-
- public FilterableMessageHandler(MessageHandler msgHandler,
- ClientMessageFilter msgFilter) {
- this.msgHandler = msgHandler;
- this.msgFilter = msgFilter;
- }
-
- public boolean hasMessageHandler() {
- return null != msgHandler;
- }
-
- public MessageHandler getMessageHandler() {
- return msgHandler;
- }
-
- public boolean hasMessageFilter() {
- return null != msgFilter;
- }
-
- public ClientMessageFilter getMessageFilter() {
- return msgFilter;
- }
-
- @Override
- public void deliver(ByteString topic, ByteString subscriberId, Message msg,
- Callback<Void> callback, Object context) {
- boolean deliver = true;
- if (hasMessageFilter()) {
- deliver = msgFilter.testMessage(msg);
- }
- if (deliver) {
- msgHandler.deliver(topic, subscriberId, msg, callback, context);
- } else {
- callback.operationFinished(context, null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
deleted file mode 100644
index 340cec5..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import org.jboss.netty.channel.Channel;
-import org.apache.hedwig.client.data.PubSubData;
-
-/**
- * A wrapper interface over netty {@link Channel} to submit hedwig's
- * {@link PubSubData} requests.
- */
-public interface HChannel {
-
- /**
- * Submit a pub/sub request.
- *
- * @param op
- * Pub/Sub Request.
- */
- public void submitOp(PubSubData op);
-
- /**
- * @return underlying netty channel
- */
- public Channel getChannel();
-
- /**
- * Close the channel without waiting.
- */
- public void close();
-
- /**
- * Close the channel
- *
- * @param wait
- * Whether wait until the channel is closed.
- */
- public void close(boolean wait);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
deleted file mode 100644
index 6fae6bb..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.net.InetSocketAddress;
-import java.util.TimerTask;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-
-/**
- * A manager manages 1) all channels established to hub servers,
- * 2) the actions taken by the topic subscribers.
- */
-public interface HChannelManager {
-
- /**
- * Submit a pub/sub request after a given <code>delay</code>.
- *
- * @param op
- * Pub/Sub Request.
- * @param delay
- * Delay time in ms.
- */
- public void submitOpAfterDelay(PubSubData op, long delay);
-
- /**
- * Submit a pub/sub request.
- *
- * @param pubSubData
- * Pub/Sub Request.
- */
- public void submitOp(PubSubData pubSubData);
-
- /**
- * Submit a pub/sub request to default server.
- *
- * @param pubSubData
- * Pub/Sub request.
- */
- public void submitOpToDefaultServer(PubSubData pubSubData);
-
- /**
- * Submit a pub/sub request to a given host.
- *
- * @param pubSubData
- * Pub/Sub request.
- * @param host
- * Given host address.
- */
- public void redirectToHost(PubSubData pubSubData, InetSocketAddress host);
-
- /**
- * Generate next transaction id for pub/sub request sending thru this manager.
- *
- * @return next transaction id.
- */
- public long nextTxnId();
-
- /**
- * Schedule a timer task after a given <code>delay</code>.
- *
- * @param task
- * A timer task
- * @param delay
- * Delay time in ms.
- */
- public void schedule(TimerTask task, long delay);
-
- /**
- * Get the subscribe response handler managed the given <code>topicSubscriber</code>.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @return subscribe response handler managed it, otherwise return null.
- */
- public SubscribeResponseHandler getSubscribeResponseHandler(
- TopicSubscriber topicSubscriber);
-
- /**
- * Start delivering messages for a given topic subscriber.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @param messageHandler
- * MessageHandler to register for this ResponseHandler instance.
- * @throws ClientNotSubscribedException
- * If the client is not currently subscribed to the topic
- * @throws AlreadyStartDeliveryException
- * If someone started delivery a message handler before stopping existed one.
- */
- public void startDelivery(TopicSubscriber topicSubscriber,
- MessageHandler messageHandler)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException;
-
- /**
- * Stop delivering messages for a given topic subscriber.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @throws ClientNotSubscribedException
- * If the client is not currently subscribed to the topic
- */
- public void stopDelivery(TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException;
-
- /**
- * Close the subscription of the given <code>topicSubscriber</code>.
- *
- * @param topicSubscriber
- * Topic Subscriber
- * @param callback
- * Callback
- * @param context
- * Callback context
- */
- public void asyncCloseSubscription(TopicSubscriber topicSubscriber,
- Callback<ResponseBody> callback,
- Object context);
-
- /**
- * Return the subscription event emitter to emit subscription events.
- *
- * @return subscription event emitter.
- */
- public SubscriptionEventEmitter getSubscriptionEventEmitter();
-
- /**
- * Is the channel manager closed.
- *
- * @return true if the channel manager is closed, otherwise return false.
- */
- public boolean isClosed();
-
- /**
- * Close the channel manager.
- */
- public void close();
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
deleted file mode 100644
index 8ae0e82..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.concurrent.Executors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager;
-import org.apache.hedwig.client.netty.impl.multiplex.MultiplexHChannelManager;
-
-/**
- * This is a top level Hedwig Client class that encapsulates the common
- * functionality needed for both Publish and Subscribe operations.
- *
- */
-public class HedwigClientImpl implements Client {
-
- private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class);
-
- // The Netty socket factory for making connections to the server.
- protected final ChannelFactory socketFactory;
- // Whether the socket factory is one we created or is owned by whoever
- // instantiated us.
- protected boolean ownChannelFactory = false;
-
- // channel manager manages all the channels established by the client
- protected final HChannelManager channelManager;
-
- private HedwigSubscriber sub;
- private final HedwigPublisher pub;
- private final ClientConfiguration cfg;
-
- public static Client create(ClientConfiguration cfg) {
- return new HedwigClientImpl(cfg);
- }
-
- public static Client create(ClientConfiguration cfg, ChannelFactory socketFactory) {
- return new HedwigClientImpl(cfg, socketFactory);
- }
-
- // Base constructor that takes in a Configuration object.
- // This will create its own client socket channel factory.
- protected HedwigClientImpl(ClientConfiguration cfg) {
- this(cfg, new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setNameFormat("HedwigClient-NIOBoss-%d").build()),
- Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setNameFormat("HedwigClient-NIOWorker-%d").build())));
- ownChannelFactory = true;
- }
-
- // Constructor that takes in a Configuration object and a ChannelFactory
- // that has already been instantiated by the caller.
- protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) {
- this.cfg = cfg;
- this.socketFactory = socketFactory;
- if (cfg.isSubscriptionChannelSharingEnabled()) {
- channelManager = new MultiplexHChannelManager(cfg, socketFactory);
- } else {
- channelManager = new SimpleHChannelManager(cfg, socketFactory);
- }
- pub = new HedwigPublisher(this);
- sub = new HedwigSubscriber(this);
- }
-
- public ClientConfiguration getConfiguration() {
- return cfg;
- }
-
- public HChannelManager getHChannelManager() {
- return channelManager;
- }
-
- public HedwigSubscriber getSubscriber() {
- return sub;
- }
-
- // Protected method to set the subscriber. This is needed currently for hub
- // versions of the client subscriber.
- protected void setSubscriber(HedwigSubscriber sub) {
- this.sub = sub;
- }
-
- public HedwigPublisher getPublisher() {
- return pub;
- }
-
- // When we are done with the client, this is a clean way to gracefully close
- // all channels/sockets created by the client and to also release all
- // resources used by netty.
- public void close() {
- logger.info("Stopping the client!");
-
- // close channel manager to release all channels
- channelManager.close();
-
- // Release resources used by the ChannelFactory on the client if we are
- // the owner that created it.
- if (ownChannelFactory) {
- socketFactory.releaseExternalResources();
- }
- logger.info("Completed stopping the client!");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
deleted file mode 100644
index 271d1eb..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.handlers.PubSubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-
-/**
- * This is the Hedwig Netty specific implementation of the Publisher interface.
- *
- */
-public class HedwigPublisher implements Publisher {
-
- private static final Logger logger = LoggerFactory.getLogger(HedwigPublisher.class);
-
- private final HChannelManager channelManager;
-
- protected HedwigPublisher(HedwigClientImpl client) {
- this.channelManager = client.getHChannelManager();
- }
-
- public PublishResponse publish(ByteString topic, Message msg)
- throws CouldNotConnectException, ServiceDownException {
-
- if (logger.isDebugEnabled()) {
- logger.debug("Calling a sync publish for topic: {}, msg: {}.",
- topic.toStringUtf8(), msg);
- }
- PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, null, null);
- synchronized (pubSubData) {
- PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
- asyncPublishWithResponseImpl(topic, msg, pubSubCallback, null);
- try {
- while (!pubSubData.isDone)
- pubSubData.wait();
- } catch (InterruptedException e) {
- throw new ServiceDownException("Interrupted Exception while waiting for async publish call");
- }
- // Check from the PubSubCallback if it was successful or not.
- if (!pubSubCallback.getIsCallSuccessful()) {
- // See what the exception was that was thrown when the operation
- // failed.
- PubSubException failureException = pubSubCallback.getFailureException();
- if (failureException == null) {
- // This should not happen as the operation failed but a null
- // PubSubException was passed. Log a warning message but
- // throw a generic ServiceDownException.
- logger.error("Sync Publish operation failed but no PubSubException was passed!");
- throw new ServiceDownException("Server ack response to publish request is not successful");
- }
- // For the expected exceptions that could occur, just rethrow
- // them.
- else if (failureException instanceof CouldNotConnectException) {
- throw (CouldNotConnectException) failureException;
- } else if (failureException instanceof ServiceDownException) {
- throw (ServiceDownException) failureException;
- } else {
- // For other types of PubSubExceptions, just throw a generic
- // ServiceDownException but log a warning message.
- logger.error("Unexpected exception type when a sync publish operation failed: ",
- failureException);
- throw new ServiceDownException("Server ack response to publish request is not successful");
- }
- }
-
- ResponseBody respBody = pubSubCallback.getResponseBody();
- if (null == respBody) {
- return null;
- }
- return respBody.hasPublishResponse() ? respBody.getPublishResponse() : null;
- }
- }
-
- public void asyncPublish(ByteString topic, Message msg,
- final Callback<Void> callback, Object context) {
- asyncPublishWithResponseImpl(topic, msg,
- new VoidCallbackAdapter<ResponseBody>(callback), context);
- }
-
- public void asyncPublishWithResponse(ByteString topic, Message msg,
- Callback<PublishResponse> callback,
- Object context) {
- // adapt the callback.
- asyncPublishWithResponseImpl(topic, msg,
- new PublishResponseCallbackAdapter(callback), context);
- }
-
- private void asyncPublishWithResponseImpl(ByteString topic, Message msg,
- Callback<ResponseBody> callback,
- Object context) {
- if (logger.isDebugEnabled()) {
- logger.debug("Calling an async publish for topic: {}, msg: {}.",
- topic.toStringUtf8(), msg);
- }
- PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null,
- callback, context);
- channelManager.submitOp(pubSubData);
- }
-
- private static class PublishResponseCallbackAdapter implements Callback<ResponseBody>{
-
- private final Callback<PublishResponse> delegate;
-
- private PublishResponseCallbackAdapter(Callback<PublishResponse> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
- if (null == resultOfOperation) {
- delegate.operationFinished(ctx, null);
- } else {
- delegate.operationFinished(ctx, resultOfOperation.getPublishResponse());
- }
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- delegate.operationFailed(ctx, exception);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
deleted file mode 100644
index 0eda290..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.client.handlers.PubSubCallback;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.SubscriptionListener;
-
-/**
- * This is the Hedwig Netty specific implementation of the Subscriber interface.
- *
- */
-@SuppressWarnings("deprecation") // so that we can implemented the Deprecated subscribe methods without a warning
-public class HedwigSubscriber implements Subscriber {
-
- private static final Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
-
- protected final ClientConfiguration cfg;
- protected final HChannelManager channelManager;
-
- public HedwigSubscriber(HedwigClientImpl client) {
- this.cfg = client.getConfiguration();
- this.channelManager = client.getHChannelManager();
- }
-
- public void addSubscriptionListener(SubscriptionListener listener) {
- channelManager.getSubscriptionEventEmitter()
- .addSubscriptionListener(listener);
- }
-
- public void removeSubscriptionListener(SubscriptionListener listener) {
- channelManager.getSubscriptionEventEmitter()
- .removeSubscriptionListener(listener);
- }
-
- // Private method that holds the common logic for doing synchronous
- // Subscribe or Unsubscribe requests. This is for code reuse since these
- // two flows are very similar. The assumption is that the input
- // OperationType is either SUBSCRIBE or UNSUBSCRIBE.
- private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
- SubscriptionOptions options)
- throws CouldNotConnectException, ClientAlreadySubscribedException,
- ClientNotSubscribedException, ServiceDownException {
- if (logger.isDebugEnabled()) {
- StringBuilder debugMsg = new StringBuilder().append("Calling a sync subUnsub request for topic: ")
- .append(topic.toStringUtf8()).append(", subscriberId: ")
- .append(subscriberId.toStringUtf8()).append(", operationType: ")
- .append(operationType);
- if (null != options) {
- debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach())
- .append(", messageBound: ").append(options.getMessageBound());
- }
- logger.debug(debugMsg.toString());
- }
- PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, null, null);
- synchronized (pubSubData) {
- PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
- asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, options);
- try {
- while (!pubSubData.isDone)
- pubSubData.wait();
- } catch (InterruptedException e) {
- throw new ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
- }
- // Check from the PubSubCallback if it was successful or not.
- if (!pubSubCallback.getIsCallSuccessful()) {
- // See what the exception was that was thrown when the operation
- // failed.
- PubSubException failureException = pubSubCallback.getFailureException();
- if (failureException == null) {
- // This should not happen as the operation failed but a null
- // PubSubException was passed. Log a warning message but
- // throw a generic ServiceDownException.
- logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
- throw new ServiceDownException("Server ack response to SubUnsub request is not successful");
- }
- // For the expected exceptions that could occur, just rethrow
- // them.
- else if (failureException instanceof CouldNotConnectException)
- throw (CouldNotConnectException) failureException;
- else if (failureException instanceof ClientAlreadySubscribedException)
- throw (ClientAlreadySubscribedException) failureException;
- else if (failureException instanceof ClientNotSubscribedException)
- throw (ClientNotSubscribedException) failureException;
- else if (failureException instanceof ServiceDownException)
- throw (ServiceDownException) failureException;
- else {
- logger.error("Unexpected PubSubException thrown: ", failureException);
- // Throw a generic ServiceDownException but wrap the
- // original PubSubException within it.
- throw new ServiceDownException(failureException);
- }
- }
- }
- }
-
- // Private method that holds the common logic for doing asynchronous
- // Subscribe or Unsubscribe requests. This is for code reuse since these two
- // flows are very similar. The assumption is that the input OperationType is
- // either SUBSCRIBE or UNSUBSCRIBE.
- private void asyncSubUnsub(ByteString topic, ByteString subscriberId,
- Callback<ResponseBody> callback, Object context,
- OperationType operationType, SubscriptionOptions options) {
- if (logger.isDebugEnabled()) {
- StringBuilder debugMsg = new StringBuilder().append("Calling a async subUnsub request for topic: ")
- .append(topic.toStringUtf8()).append(", subscriberId: ")
- .append(subscriberId.toStringUtf8()).append(", operationType: ")
- .append(operationType);
- if (null != options) {
- debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach())
- .append(", messageBound: ").append(options.getMessageBound());
- }
- logger.debug(debugMsg.toString());
- }
- if (OperationType.SUBSCRIBE.equals(operationType)) {
- if (options.getMessageBound() <= 0 &&
- cfg.getSubscriptionMessageBound() > 0) {
- SubscriptionOptions.Builder soBuilder =
- SubscriptionOptions.newBuilder(options).setMessageBound(
- cfg.getSubscriptionMessageBound());
- options = soBuilder.build();
- }
- }
- PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType,
- options, callback, context);
- channelManager.submitOp(pubSubData);
- }
-
- public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
- throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
- SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
- subscribe(topic, subscriberId, options, false);
- }
-
- public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
- throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
- subscribe(topic, subscriberId, options, false);
- }
-
- protected void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options, boolean isHub)
- throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
- // Validate that the format of the subscriberId is valid either as a
- // local or hub subscriber.
- if (!isValidSubscriberId(subscriberId, isHub)) {
- throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
- + ", isHub: " + isHub);
- }
- try {
- subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, options);
- } catch (ClientNotSubscribedException e) {
- logger.error("Unexpected Exception thrown: ", e);
- // This exception should never be thrown here. But just in case,
- // throw a generic ServiceDownException but wrap the original
- // Exception within it.
- throw new ServiceDownException(e);
- }
- }
-
- public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
- Object context) {
- SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
- asyncSubscribe(topic, subscriberId, options, callback, context, false);
- }
-
- public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options,
- Callback<Void> callback, Object context) {
- asyncSubscribe(topic, subscriberId, options, callback, context, false);
- }
-
- protected void asyncSubscribe(ByteString topic, ByteString subscriberId,
- SubscriptionOptions options,
- Callback<Void> callback, Object context, boolean isHub) {
- // Validate that the format of the subscriberId is valid either as a
- // local or hub subscriber.
- if (!isValidSubscriberId(subscriberId, isHub)) {
- callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
- "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
- return;
- }
- asyncSubUnsub(topic, subscriberId,
- new VoidCallbackAdapter<ResponseBody>(callback), context,
- OperationType.SUBSCRIBE, options);
- }
-
- public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
- ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
- unsubscribe(topic, subscriberId, false);
- }
-
- protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub)
- throws CouldNotConnectException, ClientNotSubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
- // Validate that the format of the subscriberId is valid either as a
- // local or hub subscriber.
- if (!isValidSubscriberId(subscriberId, isHub)) {
- throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
- + ", isHub: " + isHub);
- }
- // Synchronously close the subscription on the client side. Even
- // if the unsubscribe request to the server errors out, we won't be
- // delivering messages for this subscription to the client. The client
- // can later retry the unsubscribe request to the server so they are
- // "fully" unsubscribed from the given topic.
- closeSubscription(topic, subscriberId);
- try {
- subUnsub(topic, subscriberId, OperationType.UNSUBSCRIBE, null);
- } catch (ClientAlreadySubscribedException e) {
- logger.error("Unexpected Exception thrown: ", e);
- // This exception should never be thrown here. But just in case,
- // throw a generic ServiceDownException but wrap the original
- // Exception within it.
- throw new ServiceDownException(e);
- }
- }
-
- public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
- final Callback<Void> callback, final Object context) {
- doAsyncUnsubscribe(topic, subscriberId,
- new VoidCallbackAdapter<ResponseBody>(callback),
- context, false);
- }
-
- protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
- final Callback<Void> callback, final Object context, boolean isHub) {
- doAsyncUnsubscribe(topic, subscriberId,
- new VoidCallbackAdapter<ResponseBody>(callback),
- context, isHub);
- }
-
- private void doAsyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
- final Callback<ResponseBody> callback,
- final Object context, boolean isHub) {
- // Validate that the format of the subscriberId is valid either as a
- // local or hub subscriber.
- if (!isValidSubscriberId(subscriberId, isHub)) {
- callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
- "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
- return;
- }
- // Asynchronously close the subscription. On the callback to that
- // operation once it completes, post the async unsubscribe request.
- doAsyncCloseSubscription(topic, subscriberId, new Callback<ResponseBody>() {
- @Override
- public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
- asyncSubUnsub(topic, subscriberId, callback, context, OperationType.UNSUBSCRIBE, null);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- callback.operationFailed(context, exception);
- }
- }, null);
- }
-
- // This is a helper method to determine if a subscriberId is valid as either
- // a hub or local subscriber
- private boolean isValidSubscriberId(ByteString subscriberId, boolean isHub) {
- if ((isHub && !SubscriptionStateUtils.isHubSubscriber(subscriberId))
- || (!isHub && SubscriptionStateUtils.isHubSubscriber(subscriberId)))
- return false;
- else
- return true;
- }
-
- public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
- throws ClientNotSubscribedException {
- TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- logger.debug("Calling consume for {}, messageSeqId: {}.",
- topicSubscriber, messageSeqId);
-
- SubscribeResponseHandler subscribeResponseHandler =
- channelManager.getSubscribeResponseHandler(topicSubscriber);
- // Check that this topic subscription on the client side exists.
- if (null == subscribeResponseHandler ||
- !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
- throw new ClientNotSubscribedException(
- "Cannot send consume message since client is not subscribed to topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
- }
- // Send the consume message to the server using the same subscribe
- // channel that the topic subscription uses.
- subscribeResponseHandler.consume(topicSubscriber, messageSeqId);
- }
-
- public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
- ServiceDownException {
- // The subscription type of info should be stored on the server end, not
- // the client side. Eventually, the server will have the Subscription
- // Manager part that ties into Zookeeper to manage this info.
- // Commenting out these type of API's related to that here for now until
- // this data is available on the server. Will figure out what the
- // correct way to contact the server to get this info is then.
- // The client side just has soft memory state for client subscription
- // information.
- TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- SubscribeResponseHandler subscribeResponseHandler =
- channelManager.getSubscribeResponseHandler(topicSubscriber);
- return !(null == subscribeResponseHandler ||
- !subscribeResponseHandler.hasSubscription(topicSubscriber));
- }
-
- public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
- ServiceDownException {
- // Same as the previous hasSubscription method, this data should reside
- // on the server end, not the client side.
- return null;
- }
-
- public void startDelivery(final ByteString topic, final ByteString subscriberId,
- MessageHandler messageHandler)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- logger.debug("Starting delivery for {}.", topicSubscriber);
- channelManager.startDelivery(topicSubscriber, messageHandler);
- }
-
- public void startDeliveryWithFilter(final ByteString topic, final ByteString subscriberId,
- MessageHandler messageHandler,
- ClientMessageFilter messageFilter)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- if (null == messageHandler || null == messageFilter) {
- throw new NullPointerException("Null message handler or message filter is provided.");
- }
- TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- messageHandler = new FilterableMessageHandler(messageHandler, messageFilter);
- logger.debug("Starting delivery with filter for {}.", topicSubscriber);
- channelManager.startDelivery(topicSubscriber, messageHandler);
- }
-
- public void stopDelivery(final ByteString topic, final ByteString subscriberId)
- throws ClientNotSubscribedException {
- TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- logger.debug("Stopping delivery for {}.", topicSubscriber);
- channelManager.stopDelivery(topicSubscriber);
- }
-
- public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException {
- PubSubData pubSubData = new PubSubData(topic, null, subscriberId, null, null, null, null);
- synchronized (pubSubData) {
- PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
- doAsyncCloseSubscription(topic, subscriberId, pubSubCallback, null);
- try {
- while (!pubSubData.isDone)
- pubSubData.wait();
- } catch (InterruptedException e) {
- throw new ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
- }
- // Check from the PubSubCallback if it was successful or not.
- if (!pubSubCallback.getIsCallSuccessful()) {
- throw new ServiceDownException("Exception while trying to close the subscription for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
- }
- }
- }
-
- public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
- final Callback<Void> callback, final Object context) {
- doAsyncCloseSubscription(topic, subscriberId,
- new VoidCallbackAdapter<ResponseBody>(callback), context);
- }
-
- private void doAsyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
- final Callback<ResponseBody> callback, final Object context) {
- TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
- logger.debug("Stopping delivery for {} before closing subscription.", topicSubscriber);
- // We only stop delivery here not in channel manager
- // Because channelManager#asyncCloseSubscription will called
- // when subscription channel disconnected to clear local subscription
- try {
- channelManager.stopDelivery(topicSubscriber);
- } catch (ClientNotSubscribedException cnse) {
- // it is OK to ignore the exception when closing subscription
- }
- logger.debug("Closing subscription asynchronously for {}.", topicSubscriber);
- channelManager.asyncCloseSubscription(topicSubscriber, callback, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
deleted file mode 100644
index 1d4f955..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.net.InetSocketAddress;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
-
-/**
- * Utilities for network operations.
- */
-public class NetUtils {
-
- /**
- * Helper static method to get the String Hostname:Port from a netty
- * Channel. Assumption is that the netty Channel was originally created with
- * an InetSocketAddress. This is true with the Hedwig netty implementation.
- *
- * @param channel
- * Netty channel to extract the hostname and port from.
- * @return String representation of the Hostname:Port from the Netty Channel
- */
- public static InetSocketAddress getHostFromChannel(Channel channel) {
- return (InetSocketAddress) channel.getRemoteAddress();
- }
-
- /**
- * This is a helper method to build the actual pub/sub message.
- *
- * @param txnId
- * Transaction Id.
- * @param pubSubData
- * Publish call's data wrapper object.
- * @return pub sub request to send
- */
- public static PubSubRequest.Builder buildPubSubRequest(long txnId,
- PubSubData pubSubData) {
- // Create a PubSubRequest
- PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
- pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
- pubsubRequestBuilder.setType(pubSubData.operationType);
- // for consume request, we don't need to care about tried servers list
- if (OperationType.CONSUME != pubSubData.operationType) {
- if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
- pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
- }
- }
- pubsubRequestBuilder.setTxnId(txnId);
- pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
- pubsubRequestBuilder.setTopic(pubSubData.topic);
-
- switch (pubSubData.operationType) {
- case PUBLISH:
- // Set the PublishRequest into the outer PubSubRequest
- pubsubRequestBuilder.setPublishRequest(buildPublishRequest(pubSubData));
- break;
- case SUBSCRIBE:
- // Set the SubscribeRequest into the outer PubSubRequest
- pubsubRequestBuilder.setSubscribeRequest(buildSubscribeRequest(pubSubData));
- break;
- case UNSUBSCRIBE:
- // Set the UnsubscribeRequest into the outer PubSubRequest
- pubsubRequestBuilder.setUnsubscribeRequest(buildUnsubscribeRequest(pubSubData));
- break;
- case CLOSESUBSCRIPTION:
- // Set the CloseSubscriptionRequest into the outer PubSubRequest
- pubsubRequestBuilder.setCloseSubscriptionRequest(
- buildCloseSubscriptionRequest(pubSubData));
- break;
- default:
- throw new IllegalArgumentException("Unknown argument type " + pubSubData.operationType);
- }
-
- // Update the PubSubData with the txnId and the requestWriteTime
- pubSubData.txnId = txnId;
- pubSubData.requestWriteTime = System.currentTimeMillis();
-
- return pubsubRequestBuilder;
- }
-
- // build publish request
- private static PublishRequest.Builder buildPublishRequest(PubSubData pubSubData) {
- PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder();
- publishRequestBuilder.setMsg(pubSubData.msg);
- return publishRequestBuilder;
- }
-
- // build subscribe request
- private static SubscribeRequest.Builder buildSubscribeRequest(PubSubData pubSubData) { SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
- subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
- subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
- subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach());
- // For now, all subscribes should wait for all cross-regional
- // subscriptions to be established before returning.
- subscribeRequestBuilder.setSynchronous(true);
- // set subscription preferences
- SubscriptionPreferences.Builder preferencesBuilder =
- options2Preferences(pubSubData.options);
- // backward compatable with 4.1.0
- if (preferencesBuilder.hasMessageBound()) {
- subscribeRequestBuilder.setMessageBound(preferencesBuilder.getMessageBound());
- }
- subscribeRequestBuilder.setPreferences(preferencesBuilder);
- return subscribeRequestBuilder;
- }
-
- // build unsubscribe request
- private static UnsubscribeRequest.Builder buildUnsubscribeRequest(PubSubData pubSubData) {
- // Create the UnSubscribeRequest
- UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder();
- unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
- return unsubscribeRequestBuilder;
- }
-
- // build closesubscription request
- private static CloseSubscriptionRequest.Builder
- buildCloseSubscriptionRequest(PubSubData pubSubData) {
- // Create the CloseSubscriptionRequest
- CloseSubscriptionRequest.Builder closeSubscriptionRequestBuilder =
- CloseSubscriptionRequest.newBuilder();
- closeSubscriptionRequestBuilder.setSubscriberId(pubSubData.subscriberId);
- return closeSubscriptionRequestBuilder;
- }
-
- /**
- * Build consume request
- *
- * @param txnId
- * Transaction Id.
- * @param topicSubscriber
- * Topic Subscriber.
- * @param messageSeqId
- * Message Seq Id.
- * @return pub/sub request.
- */
- public static PubSubRequest.Builder buildConsumeRequest(long txnId,
- TopicSubscriber topicSubscriber,
- MessageSeqId messageSeqId) {
- // Create a PubSubRequest
- PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
- pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
- pubsubRequestBuilder.setType(OperationType.CONSUME);
-
- pubsubRequestBuilder.setTxnId(txnId);
- pubsubRequestBuilder.setTopic(topicSubscriber.getTopic());
-
- // Create the ConsumeRequest
- ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder();
- consumeRequestBuilder.setSubscriberId(topicSubscriber.getSubscriberId());
- consumeRequestBuilder.setMsgId(messageSeqId);
-
- pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
-
- return pubsubRequestBuilder;
- }
-
- /**
- * Convert client-side subscription options to subscription preferences
- *
- * @param options
- * Client-Side subscription options
- * @return subscription preferences
- */
- private static SubscriptionPreferences.Builder options2Preferences(SubscriptionOptions options) {
- // prepare subscription preferences
- SubscriptionPreferences.Builder preferencesBuilder =
- SubscriptionPreferences.newBuilder();
-
- // set message bound
- if (options.getMessageBound() > 0) {
- preferencesBuilder.setMessageBound(options.getMessageBound());
- }
-
- // set message filter
- if (options.hasMessageFilter()) {
- preferencesBuilder.setMessageFilter(options.getMessageFilter());
- }
-
- // set user options
- if (options.hasOptions()) {
- preferencesBuilder.setOptions(options.getOptions());
- }
-
- // set message window size if set
- if (options.hasMessageWindowSize() && options.getMessageWindowSize() > 0) {
- preferencesBuilder.setMessageWindowSize(options.getMessageWindowSize());
- }
-
- return preferencesBuilder;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
deleted file mode 100644
index ffe8661..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.util.SubscriptionListener;
-
-public class SubscriptionEventEmitter {
-
- private final CopyOnWriteArraySet<SubscriptionListener> listeners;
-
- public SubscriptionEventEmitter() {
- listeners = new CopyOnWriteArraySet<SubscriptionListener>();
- }
-
- public void addSubscriptionListener(SubscriptionListener listener) {
- listeners.add(listener);
- }
-
- public void removeSubscriptionListener(SubscriptionListener listener) {
- listeners.remove(listener);
- }
-
- public void emitSubscriptionEvent(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event) {
- for (SubscriptionListener listener : listeners) {
- listener.processEvent(topic, subscriberId, event);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java
deleted file mode 100644
index dc2cf8b..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Adapts from Callback<T> to Callback<Void>. (Ignores the <T> parameter).
- */
-public class VoidCallbackAdapter<T> implements Callback<T> {
- private final Callback<Void> delegate;
-
- public VoidCallbackAdapter(Callback<Void> delegate){
- this.delegate = delegate;
- }
-
- @Override
- public void operationFinished(Object ctx, T resultOfOperation) {
- delegate.operationFinished(ctx, null);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- delegate.operationFailed(ctx, exception);
- }
-}