You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:34 UTC

[24/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
deleted file mode 100644
index 6b2a965..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import java.net.InetSocketAddress;
-
-
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.util.Callback;
-
-/**
- * An interface provided to manage all subscriptions on a channel.
- *
- * Its responsibility is to handle all subscribe responses received on that channel,
- * clear up subscriptions and retry reconnectin subscriptions when channel disconnected,
- * and handle delivering messages to {@link MessageHandler} and sent consume messages
- * back to hub servers.
- */
-public abstract class SubscribeResponseHandler extends AbstractResponseHandler {
-
-    protected SubscribeResponseHandler(ClientConfiguration cfg,
-                                       HChannelManager channelManager) {
-        super(cfg, channelManager);
-    }
-
-    /**
-     * Handle Message delivered by the server.
-     *
-     * @param response
-     *          Message received from the server.
-     */
-    public abstract void handleSubscribeMessage(PubSubResponse response);
-
-    /**
-     * Handle a subscription event delivered by the server.
-     *
-     * @param topic
-     *          Topic Name
-     * @param subscriberId
-     *          Subscriber Id
-     * @param event
-     *          Subscription Event describes its status
-     */
-    public abstract void handleSubscriptionEvent(ByteString topic,
-                                                 ByteString subscriberId,
-                                                 SubscriptionEvent event);
-
-    /**
-     * Method called when a message arrives for a subscribe Channel and we want
-     * to deliver it asynchronously via the registered MessageHandler (should
-     * not be null when called here).
-     *
-     * @param message
-     *            Message from Subscribe Channel we want to consume.
-     */
-    protected abstract void asyncMessageDeliver(TopicSubscriber topicSubscriber,
-                                                Message message);
-
-    /**
-     * Method called when the client app's MessageHandler has asynchronously
-     * completed consuming a subscribed message sent from the server. The
-     * contract with the client app is that messages sent to the handler to be
-     * consumed will have the callback response done in the same order. So if we
-     * asynchronously call the MessageHandler to consume messages #1-5, that
-     * should call the messageConsumed method here via the VoidCallback in the
-     * same order. To make this thread safe, since multiple outstanding messages
-     * could be consumed by the client app and then called back to here, make
-     * this method synchronized.
-     *
-     * @param topicSubscriber
-     *            Topic Subscriber
-     * @param message
-     *            Message sent from server for topic subscription that has been
-     *            consumed by the client.
-     */
-    protected abstract void messageConsumed(TopicSubscriber topicSubscriber,
-                                            Message message);
-
-    /**
-     * Start delivering messages for a given topic subscriber.
-     *
-     * @param topicSubscriber
-     *            Topic Subscriber
-     * @param messageHandler
-     *            MessageHandler to register for this ResponseHandler instance.
-     * @throws ClientNotSubscribedException
-     *            If the client is not currently subscribed to the topic
-     * @throws AlreadyStartDeliveryException
-     *            If someone started delivery a message handler before stopping existed one.
-     */
-    public abstract void startDelivery(TopicSubscriber topicSubscriber,
-                                       MessageHandler messageHandler)
-    throws ClientNotSubscribedException, AlreadyStartDeliveryException;
-
-    /**
-     * Stop delivering messages for a given topic subscriber.
-     *
-     * @param topicSubscriber
-     *            Topic Subscriber
-     * @throws ClientNotSubscribedException
-     *             If the client is not currently subscribed to the topic
-     */
-    public abstract void stopDelivery(TopicSubscriber topicSubscriber)
-    throws ClientNotSubscribedException;
-
-    /**
-     * Whether the given topic subscriber subscribed thru this handler.
-     *
-     * @param topicSubscriber
-     *            Topic Subscriber
-     * @return whether the given topic subscriber subscribed thru this handler.
-     */
-    public abstract boolean hasSubscription(TopicSubscriber topicSubscriber);
-
-    /**
-     * Close subscription from this handler.
-     *
-     * @param topicSubscriber
-     *            Topic Subscriber
-     * @param callback
-     *            Callback when the subscription is closed. 
-     * @param context
-     *            Callback context.
-     */
-    public abstract void asyncCloseSubscription(TopicSubscriber topicSubscriber,
-                                                Callback<ResponseBody> callback,
-                                                Object context);
-
-    /**
-     * Consume a given message for given topic subscriber thru this handler.
-     *
-     * @param topicSubscriber
-     *            Topic Subscriber
-     */
-    public abstract void consume(TopicSubscriber topicSubscriber,
-                                 MessageSeqId messageSeqId);
-
-    /**
-     * This method is called when the underlying channel is disconnected due to server failure.
-     *
-     * The implementation should take the responsibility to clear subscriptions and retry
-     * reconnecting subscriptions to new hub servers.
-     *
-     * @param host
-     *          Host that channel connected to has disconnected.
-     * @param channel
-     *          Channel connected to.
-     */
-    public abstract void onChannelDisconnected(InetSocketAddress host,
-                                               Channel channel);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
deleted file mode 100644
index 0d3e8a6..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-
-public class UnsubscribeResponseHandler extends AbstractResponseHandler {
-
-    private static final Logger logger = LoggerFactory.getLogger(UnsubscribeResponseHandler.class);
-
-    public UnsubscribeResponseHandler(ClientConfiguration cfg,
-                                      HChannelManager channelManager) {
-        super(cfg, channelManager);
-    }
-
-    @Override
-    public void handleResponse(final PubSubResponse response, final PubSubData pubSubData,
-                               final Channel channel)
-            throws Exception {
-        switch (response.getStatusCode()) {
-        case SUCCESS:
-            // since for unsubscribe request, we close subscription first
-            // for now, we don't need to do anything now.
-            pubSubData.getCallback().operationFinished(pubSubData.context, null);
-            break;
-        case CLIENT_NOT_SUBSCRIBED:
-            // For Unsubscribe requests, the server says that the client was
-            // never subscribed to the topic.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new ClientNotSubscribedException(
-                                                    "Client was never subscribed to topic: " +
-                                                        pubSubData.topic.toStringUtf8() + ", subscriberId: " +
-                                                        pubSubData.subscriberId.toStringUtf8()));
-            break;
-        case SERVICE_DOWN:
-            // Response was service down failure so just invoke the callback's
-            // operationFailed method.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
-                                                    "Server responded with a SERVICE_DOWN status"));
-            break;
-        case NOT_RESPONSIBLE_FOR_TOPIC:
-            // Redirect response so we'll need to repost the original
-            // Unsubscribe Request
-            handleRedirectResponse(response, pubSubData, channel);
-            break;
-        default:
-            // Consider all other status codes as errors, operation failed
-            // cases.
-            logger.error("Unexpected error response from server for PubSubResponse: " + response);
-            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
-                                                    "Server responded with a status code of: " +
-                                                        response.getStatusCode()));
-            break;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
deleted file mode 100644
index a6ba2a6..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CleanupChannelMap<T> {
-
-    private static final Logger logger = LoggerFactory.getLogger(CleanupChannelMap.class);
-    
-    private final ConcurrentHashMap<T, HChannel> channels;
-
-    // Boolean indicating if the channel map is closed or not.
-    protected boolean closed = false;
-    protected final ReentrantReadWriteLock closedLock =
-        new ReentrantReadWriteLock();
-
-    public CleanupChannelMap() {
-        channels = new ConcurrentHashMap<T, HChannel>();
-    }
-
-    /**
-     * Add channel to the map. If an old channel has been bound
-     * to <code>key</code>, the <code>channel</code> would be
-     * closed immediately and the old channel is returned. Otherwise,
-     * the <code>channel</code> is put in the map for future usage.
-     *
-     * If the channel map has been closed, the channel would be closed
-     * immediately.
-     *
-     * @param key
-     *            Key
-     * @param channel
-     *            Channel
-     * @return the channel instance to use.
-     */
-    public HChannel addChannel(T key, HChannel channel) {
-        this.closedLock.readLock().lock();
-        try {
-            if (closed) {
-                channel.close();
-                return channel;
-            }
-            HChannel oldChannel = channels.putIfAbsent(key, channel);
-            if (null != oldChannel) {
-                logger.info("Channel for {} already exists, so no need to store it.", key);
-                channel.close();
-                return oldChannel;
-            } else {
-                logger.debug("Storing a new channel for {}.", key);
-                return channel;
-            }
-        } finally {
-            this.closedLock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Replace channel only if currently mapped to the given <code>oldChannel</code>.
-     *
-     * @param key
-     *            Key
-     * @param oldChannel
-     *            Old Channel
-     * @param newChannel
-     *            New Channel
-     * @return true if replaced successfully, otherwise false.
-     */
-    public boolean replaceChannel(T key, HChannel oldChannel, HChannel newChannel) {
-        this.closedLock.readLock().lock();
-        try {
-            if (closed) {
-                if (null != oldChannel) oldChannel.close();
-                if (null != newChannel) newChannel.close();
-                return false;
-            }
-            if (null == oldChannel) {
-                HChannel existedChannel = channels.putIfAbsent(key, newChannel);
-                if (null != existedChannel) {
-                    logger.info("Channel for {} already exists, so no need to replace it.", key);
-                    newChannel.close();
-                    return false;
-                } else {
-                    logger.debug("Storing a new channel for {}.", key);
-                    return true;
-                }
-            } else {
-                if (channels.replace(key, oldChannel, newChannel)) {
-                    logger.debug("Replacd channel {} for {}.", oldChannel, key);
-                    oldChannel.close();
-                    return true;
-                } else {
-                    newChannel.close();
-                    return false;
-                }
-            }
-        } finally {
-            this.closedLock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Returns the channel bound with <code>key</code>.
-     *
-     * @param key Key
-     * @return the channel bound with <code>key</code>.
-     */
-    public HChannel getChannel(T key) {
-        return channels.get(key);
-    }
-
-    /**
-     * Remove the channel bound with <code>key</code>.
-     *
-     * @param key Key
-     * @return the channel bound with <code>key</code>, null if no channel
-     *         is bound with <code>key</code>.
-     */
-    public HChannel removeChannel(T key) {
-        return channels.remove(key);
-    }
-
-    /**
-     * Remove the channel bound with <code>key</code>.
-     *
-     * @param key Key
-     * @param channel The channel expected to be bound with <code>key</code>.
-     * @return true if the channel is removed, false otherwise.
-     */
-    public boolean removeChannel(T key, HChannel channel) {
-        return channels.remove(key, channel);
-    }
-
-    /**
-     * Return the channels in the map.
-     *
-     * @return the set of channels.
-     */
-    public Collection<HChannel> getChannels() {
-        return channels.values();
-    }
-
-    /**
-     * Close the channels map.
-     */
-    public void close() {
-        closedLock.writeLock().lock();
-        try {
-            if (closed) {
-                return;
-            }
-            closed = true;
-        } finally {
-            closedLock.writeLock().unlock();
-        }
-        logger.debug("Closing channels map.");
-        for (HChannel channel : channels.values()) {
-            channel.close(true);
-        }
-        channels.clear();
-        logger.debug("Closed channels map.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
deleted file mode 100644
index 94e0a80..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Handlers used by a subscription.
- */
-public class FilterableMessageHandler implements MessageHandler {
-
-    MessageHandler msgHandler;
-    ClientMessageFilter  msgFilter;
-
-    public FilterableMessageHandler(MessageHandler msgHandler,
-                                    ClientMessageFilter msgFilter) {
-        this.msgHandler = msgHandler;
-        this.msgFilter = msgFilter;
-    }
-
-    public boolean hasMessageHandler() {
-        return null != msgHandler;
-    }
-
-    public MessageHandler getMessageHandler() {
-        return msgHandler;
-    }
-
-    public boolean hasMessageFilter() {
-        return null != msgFilter;
-    }
-
-    public ClientMessageFilter getMessageFilter() {
-        return msgFilter;
-    }
-
-    @Override
-    public void deliver(ByteString topic, ByteString subscriberId, Message msg,
-                        Callback<Void> callback, Object context) {
-        boolean deliver = true;
-        if (hasMessageFilter()) {
-            deliver = msgFilter.testMessage(msg);
-        }
-        if (deliver) {
-            msgHandler.deliver(topic, subscriberId, msg, callback, context);
-        } else {
-            callback.operationFinished(context, null);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
deleted file mode 100644
index 340cec5..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import org.jboss.netty.channel.Channel;
-import org.apache.hedwig.client.data.PubSubData;
-
-/**
- * A wrapper interface over netty {@link Channel} to submit hedwig's
- * {@link PubSubData} requests.
- */
-public interface HChannel {
-
-    /**
-     * Submit a pub/sub request.
-     *
-     * @param op
-     *          Pub/Sub Request.
-     */
-    public void submitOp(PubSubData op);
-
-    /**
-     * @return underlying netty channel
-     */
-    public Channel getChannel();
-
-    /**
-     * Close the channel without waiting.
-     */
-    public void close();
-
-    /**
-     * Close the channel
-     *
-     * @param wait
-     *          Whether wait until the channel is closed.
-     */
-    public void close(boolean wait);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
deleted file mode 100644
index 6fae6bb..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.net.InetSocketAddress;
-import java.util.TimerTask;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-
-/**
- * A manager manages 1) all channels established to hub servers,
- * 2) the actions taken by the topic subscribers.
- */
-public interface HChannelManager {
-
-    /**
-     * Submit a pub/sub request after a given <code>delay</code>.
-     *
-     * @param op
-     *          Pub/Sub Request.
-     * @param delay
-     *          Delay time in ms.
-     */
-    public void submitOpAfterDelay(PubSubData op, long delay);
-
-    /**
-     * Submit a pub/sub request.
-     *
-     * @param pubSubData
-     *          Pub/Sub Request.
-     */
-    public void submitOp(PubSubData pubSubData);
-
-    /**
-     * Submit a pub/sub request to default server.
-     *
-     * @param pubSubData
-     *           Pub/Sub request.
-     */
-    public void submitOpToDefaultServer(PubSubData pubSubData);
-
-    /**
-     * Submit a pub/sub request to a given host.
-     *
-     * @param pubSubData
-     *          Pub/Sub request.
-     * @param host
-     *          Given host address.
-     */
-    public void redirectToHost(PubSubData pubSubData, InetSocketAddress host);
-
-    /**
-     * Generate next transaction id for pub/sub request sending thru this manager.
-     *
-     * @return next transaction id.
-     */
-    public long nextTxnId();
-
-    /**
-     * Schedule a timer task after a given <code>delay</code>.
-     *
-     * @param task
-     *          A timer task
-     * @param delay
-     *          Delay time in ms.
-     */
-    public void schedule(TimerTask task, long delay);
-
-    /**
-     * Get the subscribe response handler managed the given <code>topicSubscriber</code>.
-     *
-     * @param topicSubscriber
-     *          Topic Subscriber
-     * @return subscribe response handler managed it, otherwise return null.
-     */
-    public SubscribeResponseHandler getSubscribeResponseHandler(
-                                    TopicSubscriber topicSubscriber);
-
-    /**
-     * Start delivering messages for a given topic subscriber.
-     *
-     * @param topicSubscriber
-     *            Topic Subscriber
-     * @param messageHandler
-     *            MessageHandler to register for this ResponseHandler instance.
-     * @throws ClientNotSubscribedException
-     *            If the client is not currently subscribed to the topic
-     * @throws AlreadyStartDeliveryException
-     *            If someone started delivery a message handler before stopping existed one.
-     */
-    public void startDelivery(TopicSubscriber topicSubscriber,
-                              MessageHandler messageHandler)
-    throws ClientNotSubscribedException, AlreadyStartDeliveryException;
-
-    /**
-     * Stop delivering messages for a given topic subscriber.
-     *
-     * @param topicSubscriber
-     *            Topic Subscriber
-     * @throws ClientNotSubscribedException
-     *             If the client is not currently subscribed to the topic
-     */
-    public void stopDelivery(TopicSubscriber topicSubscriber)
-    throws ClientNotSubscribedException;
-
-    /**
-     * Close the subscription of the given <code>topicSubscriber</code>.
-     *
-     * @param topicSubscriber
-     *          Topic Subscriber
-     * @param callback
-     *          Callback
-     * @param context
-     *          Callback context
-     */
-    public void asyncCloseSubscription(TopicSubscriber topicSubscriber,
-                                       Callback<ResponseBody> callback,
-                                       Object context);
-
-    /**
-     * Return the subscription event emitter to emit subscription events.
-     *
-     * @return subscription event emitter.
-     */
-    public SubscriptionEventEmitter getSubscriptionEventEmitter();
-
-    /**
-     * Is the channel manager closed.
-     *
-     * @return true if the channel manager is closed, otherwise return false.
-     */
-    public boolean isClosed();
-
-    /**
-     * Close the channel manager.
-     */
-    public void close();
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
deleted file mode 100644
index 8ae0e82..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.concurrent.Executors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager;
-import org.apache.hedwig.client.netty.impl.multiplex.MultiplexHChannelManager;
-
-/**
- * This is a top level Hedwig Client class that encapsulates the common
- * functionality needed for both Publish and Subscribe operations.
- *
- */
-public class HedwigClientImpl implements Client {
-
-    private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class);
-
-    // The Netty socket factory for making connections to the server.
-    protected final ChannelFactory socketFactory;
-    // Whether the socket factory is one we created or is owned by whoever
-    // instantiated us.
-    protected boolean ownChannelFactory = false;
-
-    // channel manager manages all the channels established by the client
-    protected final HChannelManager channelManager;
-
-    private HedwigSubscriber sub;
-    private final HedwigPublisher pub;
-    private final ClientConfiguration cfg;
-
-    public static Client create(ClientConfiguration cfg) {
-        return new HedwigClientImpl(cfg);
-    }
-
-    public static Client create(ClientConfiguration cfg, ChannelFactory socketFactory) {
-        return new HedwigClientImpl(cfg, socketFactory);
-    }
-
-    // Base constructor that takes in a Configuration object.
-    // This will create its own client socket channel factory.
-    protected HedwigClientImpl(ClientConfiguration cfg) {
-        this(cfg, new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                        .setNameFormat("HedwigClient-NIOBoss-%d").build()),
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                        .setNameFormat("HedwigClient-NIOWorker-%d").build())));
-        ownChannelFactory = true;
-    }
-
-    // Constructor that takes in a Configuration object and a ChannelFactory
-    // that has already been instantiated by the caller.
-    protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) {
-        this.cfg = cfg;
-        this.socketFactory = socketFactory;
-        if (cfg.isSubscriptionChannelSharingEnabled()) {
-            channelManager = new MultiplexHChannelManager(cfg, socketFactory);
-        } else {
-            channelManager = new SimpleHChannelManager(cfg, socketFactory);
-        }
-        pub = new HedwigPublisher(this);
-        sub = new HedwigSubscriber(this);
-    }
-
-    public ClientConfiguration getConfiguration() {
-        return cfg;
-    }
-
-    public HChannelManager getHChannelManager() {
-        return channelManager;
-    }
-
-    public HedwigSubscriber getSubscriber() {
-        return sub;
-    }
-
-    // Protected method to set the subscriber. This is needed currently for hub
-    // versions of the client subscriber.
-    protected void setSubscriber(HedwigSubscriber sub) {
-        this.sub = sub;
-    }
-
-    public HedwigPublisher getPublisher() {
-        return pub;
-    }
-
-    // When we are done with the client, this is a clean way to gracefully close
-    // all channels/sockets created by the client and to also release all
-    // resources used by netty.
-    public void close() {
-        logger.info("Stopping the client!");
-
-        // close channel manager to release all channels
-        channelManager.close(); 
-
-        // Release resources used by the ChannelFactory on the client if we are
-        // the owner that created it.
-        if (ownChannelFactory) {
-            socketFactory.releaseExternalResources();
-        }
-        logger.info("Completed stopping the client!");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
deleted file mode 100644
index 271d1eb..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.handlers.PubSubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-
-/**
- * This is the Hedwig Netty specific implementation of the Publisher interface.
- *
- */
-public class HedwigPublisher implements Publisher {
-
-    private static final Logger logger = LoggerFactory.getLogger(HedwigPublisher.class);
-
-    private final HChannelManager channelManager;
-
-    protected HedwigPublisher(HedwigClientImpl client) {
-        this.channelManager = client.getHChannelManager();
-    }
-
-    public PublishResponse publish(ByteString topic, Message msg)
-        throws CouldNotConnectException, ServiceDownException {
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Calling a sync publish for topic: {}, msg: {}.",
-                         topic.toStringUtf8(), msg);
-        }
-        PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, null, null);
-        synchronized (pubSubData) {
-            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
-            asyncPublishWithResponseImpl(topic, msg, pubSubCallback, null);
-            try {
-                while (!pubSubData.isDone)
-                    pubSubData.wait();
-            } catch (InterruptedException e) {
-                throw new ServiceDownException("Interrupted Exception while waiting for async publish call");
-            }
-            // Check from the PubSubCallback if it was successful or not.
-            if (!pubSubCallback.getIsCallSuccessful()) {
-                // See what the exception was that was thrown when the operation
-                // failed.
-                PubSubException failureException = pubSubCallback.getFailureException();
-                if (failureException == null) {
-                    // This should not happen as the operation failed but a null
-                    // PubSubException was passed. Log a warning message but
-                    // throw a generic ServiceDownException.
-                    logger.error("Sync Publish operation failed but no PubSubException was passed!");
-                    throw new ServiceDownException("Server ack response to publish request is not successful");
-                }
-                // For the expected exceptions that could occur, just rethrow
-                // them.
-                else if (failureException instanceof CouldNotConnectException) {
-                    throw (CouldNotConnectException) failureException;
-                } else if (failureException instanceof ServiceDownException) {
-                    throw (ServiceDownException) failureException;
-                } else {
-                    // For other types of PubSubExceptions, just throw a generic
-                    // ServiceDownException but log a warning message.
-                    logger.error("Unexpected exception type when a sync publish operation failed: ",
-                                 failureException);
-                    throw new ServiceDownException("Server ack response to publish request is not successful");
-                }
-            }
-
-            ResponseBody respBody = pubSubCallback.getResponseBody();
-            if (null == respBody) {
-                return null;
-            }
-            return respBody.hasPublishResponse() ? respBody.getPublishResponse() : null;
-        }
-    }
-
-    public void asyncPublish(ByteString topic, Message msg,
-                             final Callback<Void> callback, Object context) {
-        asyncPublishWithResponseImpl(topic, msg,
-                                     new VoidCallbackAdapter<ResponseBody>(callback), context);
-    }
-
-    public void asyncPublishWithResponse(ByteString topic, Message msg,
-                                         Callback<PublishResponse> callback,
-                                         Object context) {
-        // adapt the callback.
-        asyncPublishWithResponseImpl(topic, msg,
-                                     new PublishResponseCallbackAdapter(callback), context);
-    }
-
-    private void asyncPublishWithResponseImpl(ByteString topic, Message msg,
-                                              Callback<ResponseBody> callback,
-                                              Object context) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Calling an async publish for topic: {}, msg: {}.",
-                         topic.toStringUtf8(), msg);
-        }
-        PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null,
-                                               callback, context);
-        channelManager.submitOp(pubSubData);
-    }
-
-    private static class PublishResponseCallbackAdapter implements Callback<ResponseBody>{
-
-        private final Callback<PublishResponse> delegate;
-
-        private PublishResponseCallbackAdapter(Callback<PublishResponse> delegate) {
-            this.delegate = delegate;
-        }
-
-        @Override
-        public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
-            if (null == resultOfOperation) {
-                delegate.operationFinished(ctx, null);
-            } else {
-                delegate.operationFinished(ctx, resultOfOperation.getPublishResponse());
-            }
-        }
-
-        @Override
-        public void operationFailed(Object ctx, PubSubException exception) {
-            delegate.operationFailed(ctx, exception);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
deleted file mode 100644
index 0eda290..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.client.handlers.PubSubCallback;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.SubscriptionListener;
-
-/**
- * This is the Hedwig Netty specific implementation of the Subscriber interface.
- *
- */
-@SuppressWarnings("deprecation") // so that we can implemented the Deprecated subscribe methods without a warning
-public class HedwigSubscriber implements Subscriber {
-
-    private static final Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
-
-    protected final ClientConfiguration cfg;
-    protected final HChannelManager channelManager;
-
-    public HedwigSubscriber(HedwigClientImpl client) {
-        this.cfg = client.getConfiguration();
-        this.channelManager = client.getHChannelManager();
-    }
-
-    public void addSubscriptionListener(SubscriptionListener listener) {
-        channelManager.getSubscriptionEventEmitter()
-                      .addSubscriptionListener(listener);
-    }
-
-    public void removeSubscriptionListener(SubscriptionListener listener) {
-        channelManager.getSubscriptionEventEmitter()
-                      .removeSubscriptionListener(listener);
-    }
-
-    // Private method that holds the common logic for doing synchronous
-    // Subscribe or Unsubscribe requests. This is for code reuse since these
-    // two flows are very similar. The assumption is that the input
-    // OperationType is either SUBSCRIBE or UNSUBSCRIBE.
-    private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
-                          SubscriptionOptions options)
-            throws CouldNotConnectException, ClientAlreadySubscribedException,
-        ClientNotSubscribedException, ServiceDownException {
-        if (logger.isDebugEnabled()) {
-            StringBuilder debugMsg = new StringBuilder().append("Calling a sync subUnsub request for topic: ")
-                                     .append(topic.toStringUtf8()).append(", subscriberId: ")
-                                     .append(subscriberId.toStringUtf8()).append(", operationType: ")
-                                     .append(operationType);
-            if (null != options) {
-                debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach())
-                        .append(", messageBound: ").append(options.getMessageBound());
-            }
-            logger.debug(debugMsg.toString());
-        }
-        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, null, null);
-        synchronized (pubSubData) {
-            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
-            asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, options);
-            try {
-                while (!pubSubData.isDone)
-                    pubSubData.wait();
-            } catch (InterruptedException e) {
-                throw new ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
-            }
-            // Check from the PubSubCallback if it was successful or not.
-            if (!pubSubCallback.getIsCallSuccessful()) {
-                // See what the exception was that was thrown when the operation
-                // failed.
-                PubSubException failureException = pubSubCallback.getFailureException();
-                if (failureException == null) {
-                    // This should not happen as the operation failed but a null
-                    // PubSubException was passed. Log a warning message but
-                    // throw a generic ServiceDownException.
-                    logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
-                    throw new ServiceDownException("Server ack response to SubUnsub request is not successful");
-                }
-                // For the expected exceptions that could occur, just rethrow
-                // them.
-                else if (failureException instanceof CouldNotConnectException)
-                    throw (CouldNotConnectException) failureException;
-                else if (failureException instanceof ClientAlreadySubscribedException)
-                    throw (ClientAlreadySubscribedException) failureException;
-                else if (failureException instanceof ClientNotSubscribedException)
-                    throw (ClientNotSubscribedException) failureException;
-                else if (failureException instanceof ServiceDownException)
-                    throw (ServiceDownException) failureException;
-                else {
-                    logger.error("Unexpected PubSubException thrown: ", failureException);
-                    // Throw a generic ServiceDownException but wrap the
-                    // original PubSubException within it.
-                    throw new ServiceDownException(failureException);
-                }
-            }
-        }
-    }
-
-    // Private method that holds the common logic for doing asynchronous
-    // Subscribe or Unsubscribe requests. This is for code reuse since these two
-    // flows are very similar. The assumption is that the input OperationType is
-    // either SUBSCRIBE or UNSUBSCRIBE.
-    private void asyncSubUnsub(ByteString topic, ByteString subscriberId,
-                               Callback<ResponseBody> callback, Object context,
-                               OperationType operationType, SubscriptionOptions options) {
-        if (logger.isDebugEnabled()) {
-            StringBuilder debugMsg = new StringBuilder().append("Calling a async subUnsub request for topic: ")
-                                     .append(topic.toStringUtf8()).append(", subscriberId: ")
-                                     .append(subscriberId.toStringUtf8()).append(", operationType: ")
-                                     .append(operationType);
-            if (null != options) {
-                debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach())
-                        .append(", messageBound: ").append(options.getMessageBound());
-            }
-            logger.debug(debugMsg.toString());
-        }
-        if (OperationType.SUBSCRIBE.equals(operationType)) {
-            if (options.getMessageBound() <= 0 &&
-                cfg.getSubscriptionMessageBound() > 0) {
-                SubscriptionOptions.Builder soBuilder =
-                    SubscriptionOptions.newBuilder(options).setMessageBound(
-                        cfg.getSubscriptionMessageBound());
-                options = soBuilder.build();
-            }
-        }
-        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType,
-                                               options, callback, context);
-        channelManager.submitOp(pubSubData);
-    }
-
-    public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
-            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-        InvalidSubscriberIdException {
-        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
-        subscribe(topic, subscriberId, options, false);
-    }
-
-    public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
-            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-         InvalidSubscriberIdException {
-        subscribe(topic, subscriberId, options, false);
-    }
-
-    protected void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options, boolean isHub)
-            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-        InvalidSubscriberIdException {
-        // Validate that the format of the subscriberId is valid either as a
-        // local or hub subscriber.
-        if (!isValidSubscriberId(subscriberId, isHub)) {
-            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
-                                                   + ", isHub: " + isHub);
-        }
-        try {
-            subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, options);
-        } catch (ClientNotSubscribedException e) {
-            logger.error("Unexpected Exception thrown: ", e);
-            // This exception should never be thrown here. But just in case,
-            // throw a generic ServiceDownException but wrap the original
-            // Exception within it.
-            throw new ServiceDownException(e);
-        }
-    }
-
-    public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
-                               Object context) {
-        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
-        asyncSubscribe(topic, subscriberId, options, callback, context, false);
-    }
-
-    public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options,
-                               Callback<Void> callback, Object context) {
-        asyncSubscribe(topic, subscriberId, options, callback, context, false);
-    }
-
-    protected void asyncSubscribe(ByteString topic, ByteString subscriberId,
-                                  SubscriptionOptions options,
-                                  Callback<Void> callback, Object context, boolean isHub) {
-        // Validate that the format of the subscriberId is valid either as a
-        // local or hub subscriber.
-        if (!isValidSubscriberId(subscriberId, isHub)) {
-            callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
-                                         "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
-            return;
-        }
-        asyncSubUnsub(topic, subscriberId,
-                      new VoidCallbackAdapter<ResponseBody>(callback), context,
-                      OperationType.SUBSCRIBE, options);
-    }
-
-    public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
-        ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
-        unsubscribe(topic, subscriberId, false);
-    }
-
-    protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub)
-            throws CouldNotConnectException, ClientNotSubscribedException, ServiceDownException,
-        InvalidSubscriberIdException {
-        // Validate that the format of the subscriberId is valid either as a
-        // local or hub subscriber.
-        if (!isValidSubscriberId(subscriberId, isHub)) {
-            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
-                                                   + ", isHub: " + isHub);
-        }
-        // Synchronously close the subscription on the client side. Even
-        // if the unsubscribe request to the server errors out, we won't be
-        // delivering messages for this subscription to the client. The client
-        // can later retry the unsubscribe request to the server so they are
-        // "fully" unsubscribed from the given topic.
-        closeSubscription(topic, subscriberId);
-        try {
-            subUnsub(topic, subscriberId, OperationType.UNSUBSCRIBE, null);
-        } catch (ClientAlreadySubscribedException e) {
-            logger.error("Unexpected Exception thrown: ", e);
-            // This exception should never be thrown here. But just in case,
-            // throw a generic ServiceDownException but wrap the original
-            // Exception within it.
-            throw new ServiceDownException(e);
-        }
-    }
-
-    public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
-                                 final Callback<Void> callback, final Object context) {
-        doAsyncUnsubscribe(topic, subscriberId,
-                           new VoidCallbackAdapter<ResponseBody>(callback),
-                           context, false);
-    }
-
-    protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
-                                    final Callback<Void> callback, final Object context, boolean isHub) {
-        doAsyncUnsubscribe(topic, subscriberId,
-                           new VoidCallbackAdapter<ResponseBody>(callback),
-                           context, isHub);
-    }
-
-    private void doAsyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
-                                    final Callback<ResponseBody> callback,
-                                    final Object context, boolean isHub) {
-        // Validate that the format of the subscriberId is valid either as a
-        // local or hub subscriber.
-        if (!isValidSubscriberId(subscriberId, isHub)) {
-            callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
-                                         "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
-            return;
-        }
-        // Asynchronously close the subscription. On the callback to that
-        // operation once it completes, post the async unsubscribe request.
-        doAsyncCloseSubscription(topic, subscriberId, new Callback<ResponseBody>() {
-            @Override
-            public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
-                asyncSubUnsub(topic, subscriberId, callback, context, OperationType.UNSUBSCRIBE, null);
-            }
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                callback.operationFailed(context, exception);
-            }
-        }, null);
-    }
-
-    // This is a helper method to determine if a subscriberId is valid as either
-    // a hub or local subscriber
-    private boolean isValidSubscriberId(ByteString subscriberId, boolean isHub) {
-        if ((isHub && !SubscriptionStateUtils.isHubSubscriber(subscriberId))
-                || (!isHub && SubscriptionStateUtils.isHubSubscriber(subscriberId)))
-            return false;
-        else
-            return true;
-    }
-
-    public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
-            throws ClientNotSubscribedException {
-        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        logger.debug("Calling consume for {}, messageSeqId: {}.",
-                     topicSubscriber, messageSeqId);
-
-        SubscribeResponseHandler subscribeResponseHandler =
-            channelManager.getSubscribeResponseHandler(topicSubscriber);
-        // Check that this topic subscription on the client side exists.
-        if (null == subscribeResponseHandler ||
-            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
-            throw new ClientNotSubscribedException(
-                "Cannot send consume message since client is not subscribed to topic: "
-                + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
-        }
-        // Send the consume message to the server using the same subscribe
-        // channel that the topic subscription uses.
-        subscribeResponseHandler.consume(topicSubscriber, messageSeqId);
-    }
-
-    public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
-        ServiceDownException {
-        // The subscription type of info should be stored on the server end, not
-        // the client side. Eventually, the server will have the Subscription
-        // Manager part that ties into Zookeeper to manage this info.
-        // Commenting out these type of API's related to that here for now until
-        // this data is available on the server. Will figure out what the
-        // correct way to contact the server to get this info is then.
-        // The client side just has soft memory state for client subscription
-        // information.
-        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        SubscribeResponseHandler subscribeResponseHandler =
-            channelManager.getSubscribeResponseHandler(topicSubscriber);
-        return !(null == subscribeResponseHandler ||
-                 !subscribeResponseHandler.hasSubscription(topicSubscriber));
-    }
-
-    public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
-        ServiceDownException {
-        // Same as the previous hasSubscription method, this data should reside
-        // on the server end, not the client side.
-        return null;
-    }
-
-    public void startDelivery(final ByteString topic, final ByteString subscriberId,
-                              MessageHandler messageHandler)
-            throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        logger.debug("Starting delivery for {}.", topicSubscriber);
-        channelManager.startDelivery(topicSubscriber, messageHandler); 
-    }
-
-    public void startDeliveryWithFilter(final ByteString topic, final ByteString subscriberId,
-                                        MessageHandler messageHandler,
-                                        ClientMessageFilter messageFilter)
-            throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        if (null == messageHandler || null == messageFilter) {
-            throw new NullPointerException("Null message handler or message filter is       provided.");
-        }
-        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        messageHandler = new FilterableMessageHandler(messageHandler, messageFilter);
-        logger.debug("Starting delivery with filter for {}.", topicSubscriber);
-        channelManager.startDelivery(topicSubscriber, messageHandler);
-    }
-
-    public void stopDelivery(final ByteString topic, final ByteString subscriberId)
-    throws ClientNotSubscribedException {
-        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        logger.debug("Stopping delivery for {}.", topicSubscriber);
-        channelManager.stopDelivery(topicSubscriber); 
-    }
-
-    public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException {
-        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, null, null, null, null);
-        synchronized (pubSubData) {
-            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
-            doAsyncCloseSubscription(topic, subscriberId, pubSubCallback, null);
-            try {
-                while (!pubSubData.isDone)
-                    pubSubData.wait();
-            } catch (InterruptedException e) {
-                throw new ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
-            }
-            // Check from the PubSubCallback if it was successful or not.
-            if (!pubSubCallback.getIsCallSuccessful()) {
-                throw new ServiceDownException("Exception while trying to close the subscription for topic: "
-                                               + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
-            }
-        }
-    }
-
-    public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
-                                       final Callback<Void> callback, final Object context) {
-        doAsyncCloseSubscription(topic, subscriberId,
-                                 new VoidCallbackAdapter<ResponseBody>(callback), context);
-    }
-
-    private void doAsyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
-                                          final Callback<ResponseBody> callback, final Object context) {
-        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        logger.debug("Stopping delivery for {} before closing subscription.", topicSubscriber);
-        // We only stop delivery here not in channel manager
-        // Because channelManager#asyncCloseSubscription will called
-        // when subscription channel disconnected to clear local subscription
-        try {
-            channelManager.stopDelivery(topicSubscriber); 
-        } catch (ClientNotSubscribedException cnse) {
-            // it is OK to ignore the exception when closing subscription
-        }
-        logger.debug("Closing subscription asynchronously for {}.", topicSubscriber);
-        channelManager.asyncCloseSubscription(topicSubscriber, callback, context);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
deleted file mode 100644
index 1d4f955..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.net.InetSocketAddress;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
-
-/**
- * Utilities for network operations.
- */
-public class NetUtils {
-
-    /**
-     * Helper static method to get the String Hostname:Port from a netty
-     * Channel. Assumption is that the netty Channel was originally created with
-     * an InetSocketAddress. This is true with the Hedwig netty implementation.
-     *
-     * @param channel
-     *            Netty channel to extract the hostname and port from.
-     * @return String representation of the Hostname:Port from the Netty Channel
-     */
-    public static InetSocketAddress getHostFromChannel(Channel channel) {
-        return (InetSocketAddress) channel.getRemoteAddress();
-    }
-
-    /**
-     * This is a helper method to build the actual pub/sub message.
-     *
-     * @param txnId
-     *            Transaction Id.
-     * @param pubSubData
-     *            Publish call's data wrapper object.
-     * @return pub sub request to send
-     */
-    public static PubSubRequest.Builder buildPubSubRequest(long txnId,
-                                                           PubSubData pubSubData) {
-        // Create a PubSubRequest
-        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
-        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
-        pubsubRequestBuilder.setType(pubSubData.operationType);
-        // for consume request, we don't need to care about tried servers list
-        if (OperationType.CONSUME != pubSubData.operationType) {
-            if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
-                pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
-            }
-        }
-        pubsubRequestBuilder.setTxnId(txnId);
-        pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
-        pubsubRequestBuilder.setTopic(pubSubData.topic);
-
-        switch (pubSubData.operationType) {
-        case PUBLISH:
-            // Set the PublishRequest into the outer PubSubRequest
-            pubsubRequestBuilder.setPublishRequest(buildPublishRequest(pubSubData));
-            break;
-        case SUBSCRIBE:
-            // Set the SubscribeRequest into the outer PubSubRequest
-            pubsubRequestBuilder.setSubscribeRequest(buildSubscribeRequest(pubSubData));
-            break;
-        case UNSUBSCRIBE:
-            // Set the UnsubscribeRequest into the outer PubSubRequest
-            pubsubRequestBuilder.setUnsubscribeRequest(buildUnsubscribeRequest(pubSubData));
-            break;
-        case CLOSESUBSCRIPTION:
-            // Set the CloseSubscriptionRequest into the outer PubSubRequest
-            pubsubRequestBuilder.setCloseSubscriptionRequest(
-                buildCloseSubscriptionRequest(pubSubData));
-            break;
-        default:
-            throw new IllegalArgumentException("Unknown argument type " + pubSubData.operationType);
-        }
-
-        // Update the PubSubData with the txnId and the requestWriteTime
-        pubSubData.txnId = txnId;
-        pubSubData.requestWriteTime = System.currentTimeMillis();
-
-        return pubsubRequestBuilder;
-    }
-
-    // build publish request
-    private static PublishRequest.Builder buildPublishRequest(PubSubData pubSubData) {
-        PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder();
-        publishRequestBuilder.setMsg(pubSubData.msg);
-        return publishRequestBuilder;
-    }
-
-    // build subscribe request
-    private static SubscribeRequest.Builder buildSubscribeRequest(PubSubData pubSubData) { SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder();
-        subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
-        subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
-        subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach());
-        // For now, all subscribes should wait for all cross-regional
-        // subscriptions to be established before returning.
-        subscribeRequestBuilder.setSynchronous(true);
-        // set subscription preferences
-        SubscriptionPreferences.Builder preferencesBuilder =
-            options2Preferences(pubSubData.options);
-        // backward compatable with 4.1.0
-        if (preferencesBuilder.hasMessageBound()) {
-            subscribeRequestBuilder.setMessageBound(preferencesBuilder.getMessageBound());
-        } 
-        subscribeRequestBuilder.setPreferences(preferencesBuilder);
-        return subscribeRequestBuilder;
-    }
-
-    // build unsubscribe request
-    private static UnsubscribeRequest.Builder buildUnsubscribeRequest(PubSubData pubSubData) {
-        // Create the UnSubscribeRequest
-        UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder();
-        unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
-        return unsubscribeRequestBuilder;
-    }
-
-    // build closesubscription request
-    private static CloseSubscriptionRequest.Builder
-        buildCloseSubscriptionRequest(PubSubData pubSubData) {
-        // Create the CloseSubscriptionRequest
-        CloseSubscriptionRequest.Builder closeSubscriptionRequestBuilder =
-            CloseSubscriptionRequest.newBuilder();
-        closeSubscriptionRequestBuilder.setSubscriberId(pubSubData.subscriberId);
-        return closeSubscriptionRequestBuilder;
-    }
-
-    /**
-     * Build consume request
-     *
-     * @param txnId
-     *          Transaction Id.
-     * @param topicSubscriber
-     *          Topic Subscriber.
-     * @param messageSeqId
-     *          Message Seq Id.
-     * @return pub/sub request.
-     */
-    public static PubSubRequest.Builder buildConsumeRequest(long txnId,
-                                                            TopicSubscriber topicSubscriber,
-                                                            MessageSeqId messageSeqId) {
-        // Create a PubSubRequest
-        PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder();
-        pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE);
-        pubsubRequestBuilder.setType(OperationType.CONSUME);
-
-        pubsubRequestBuilder.setTxnId(txnId);
-        pubsubRequestBuilder.setTopic(topicSubscriber.getTopic());
-
-        // Create the ConsumeRequest
-        ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder();
-        consumeRequestBuilder.setSubscriberId(topicSubscriber.getSubscriberId());
-        consumeRequestBuilder.setMsgId(messageSeqId);
-
-        pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
-
-        return pubsubRequestBuilder;
-    }
-
-    /**
-     * Convert client-side subscription options to subscription preferences
-     *
-     * @param options
-     *          Client-Side subscription options
-     * @return subscription preferences
-     */
-    private static SubscriptionPreferences.Builder options2Preferences(SubscriptionOptions options) {
-        // prepare subscription preferences
-        SubscriptionPreferences.Builder preferencesBuilder =
-            SubscriptionPreferences.newBuilder();
-
-        // set message bound
-        if (options.getMessageBound() > 0) {
-            preferencesBuilder.setMessageBound(options.getMessageBound());
-        }
-
-        // set message filter
-        if (options.hasMessageFilter()) {
-            preferencesBuilder.setMessageFilter(options.getMessageFilter());
-        }
-
-        // set user options
-        if (options.hasOptions()) {
-            preferencesBuilder.setOptions(options.getOptions());
-        }
-
-        // set message window size if set
-        if (options.hasMessageWindowSize() && options.getMessageWindowSize() > 0) {
-            preferencesBuilder.setMessageWindowSize(options.getMessageWindowSize());
-        }
-
-        return preferencesBuilder;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
deleted file mode 100644
index ffe8661..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.util.SubscriptionListener;
-
-public class SubscriptionEventEmitter {
-
-    private final CopyOnWriteArraySet<SubscriptionListener> listeners;
-
-    public SubscriptionEventEmitter() {
-        listeners = new CopyOnWriteArraySet<SubscriptionListener>();
-    }
-
-    public void addSubscriptionListener(SubscriptionListener listener) {
-        listeners.add(listener); 
-    }
-
-    public void removeSubscriptionListener(SubscriptionListener listener) {
-        listeners.remove(listener);
-    }
-
-    public void emitSubscriptionEvent(ByteString topic, ByteString subscriberId,
-                                      SubscriptionEvent event) {
-        for (SubscriptionListener listener : listeners) {
-            listener.processEvent(topic, subscriberId, event);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java
deleted file mode 100644
index dc2cf8b..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Adapts from Callback&lt;T> to Callback&lt;Void>. (Ignores the &lt;T> parameter).
- */
-public class VoidCallbackAdapter<T> implements Callback<T> {
-    private final Callback<Void> delegate;
-
-    public VoidCallbackAdapter(Callback<Void> delegate){
-        this.delegate = delegate;
-    }
-
-    @Override
-    public void operationFinished(Object ctx, T resultOfOperation) {
-        delegate.operationFinished(ctx, null);
-    }
-
-    @Override
-    public void operationFailed(Object ctx, PubSubException exception) {
-        delegate.operationFailed(ctx, exception);
-    }
-}