You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:32 UTC

[22/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
deleted file mode 100644
index fd58747..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
-import java.util.LinkedList;
-import java.util.Queue;
-
-import com.google.protobuf.ByteString;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import static org.apache.hedwig.util.VarArgs.va;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide a wrapper over netty channel for Hedwig operations.
- */
-public class HChannelImpl implements HChannel {
-
-    private static Logger logger = LoggerFactory.getLogger(HChannelImpl.class);
-
-    enum State {
-        DISCONNECTED,
-        CONNECTING,
-        CONNECTED,
-    };
-
-    InetSocketAddress host;
-    final AbstractHChannelManager channelManager;
-    final ClientChannelPipelineFactory pipelineFactory;
-    volatile Channel channel;
-    volatile State state;
-
-    // Indicates whether the channel is closed or not.
-    volatile boolean closed = false;
-    // Queue the pubsub requests when the channel is not connected.
-    Queue<PubSubData> pendingOps = new ArrayDeque<PubSubData>();
-
-    /**
-     * Create a un-established channel with provided target <code>host</code>.
-     *
-     * @param host
-     *          Target host address.
-     * @param channelManager
-     *          Channel manager manages the channels.
-     */
-    protected HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager) {
-        this(host, channelManager, null);
-    }
-
-    public HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager,
-                        ClientChannelPipelineFactory pipelineFactory) {
-        this(host, null, channelManager, pipelineFactory);
-        state = State.DISCONNECTED;
-    }
-
-    /**
-     * Create a <code>HChannel</code> with an established netty channel.
-     *
-     * @param host
-     *          Target host address.
-     * @param channel
-     *          Established Netty channel.
-     * @param channelManager
-     *          Channel manager manages the channels.
-     */
-    public HChannelImpl(InetSocketAddress host, Channel channel,
-                        AbstractHChannelManager channelManager,
-                        ClientChannelPipelineFactory pipelineFactory) {
-        this.host = host;
-        this.channel = channel;
-        this.channelManager = channelManager;
-        this.pipelineFactory = pipelineFactory;
-        state = State.CONNECTED;
-    }
-
-    @Override
-    public void submitOp(PubSubData pubSubData) {
-        boolean doOpNow = false;
-
-        // common case without lock first
-        if (null != channel && State.CONNECTED == state) {
-            doOpNow = true;
-        } else {
-            synchronized (this) {
-                // check channel & state again under lock
-                if (null != channel && State.CONNECTED == state) {
-                    doOpNow = true;
-                } else {
-                    // if reached here, channel is either null (first connection attempt),
-                    // or the channel is disconnected. Connection attempt is still in progress,
-                    // queue up this op. Op will be executed when connection attempt either
-                    // fails or succeeds
-                    pendingOps.add(pubSubData);
-                }
-            }
-            if (!doOpNow) {
-                // start connection attempt to server
-                connect();
-            }
-        }
-        if (doOpNow) {
-            executeOpAfterConnected(pubSubData); 
-        }
-    }
-
-    /**
-     * Execute pub/sub operation after the underlying channel is connected.
-     *
-     * @param pubSubData
-     *          Pub/Sub Operation
-     */
-    private void executeOpAfterConnected(PubSubData pubSubData) {
-        PubSubRequest.Builder reqBuilder =
-            NetUtils.buildPubSubRequest(channelManager.nextTxnId(), pubSubData);
-        writePubSubRequest(pubSubData, reqBuilder.build());
-    }
-
-    @Override
-    public Channel getChannel() {
-        return channel;
-    }
-
-    private void writePubSubRequest(PubSubData pubSubData, PubSubRequest pubSubRequest) {
-        if (closed || null == channel || State.CONNECTED != state) {
-            retryOrFailOp(pubSubData);
-            return;
-        }
-
-        // Before we do the write, store this information into the
-        // ResponseHandler so when the server responds, we know what
-        // appropriate Callback Data to invoke for the given txn ID.
-        try {
-            getHChannelHandlerFromChannel(channel)
-                .addTxn(pubSubData.txnId, pubSubData);
-        } catch (NoResponseHandlerException nrhe) {
-            logger.warn("No Channel Handler found for channel {} when writing request."
-                        + " It might already disconnect.", channel);
-            return;
-        }
-
-        // Finally, write the pub/sub request through the Channel.
-        logger.debug("Writing a {} request to host: {} for pubSubData: {}.",
-                     va(pubSubData.operationType, host, pubSubData));
-        ChannelFuture future = channel.write(pubSubRequest);
-        future.addListener(new WriteCallback(pubSubData, channelManager));
-    }
-
-    /**
-     * Re-submit operation to default server or fail it.
-     *
-     * @param pubSubData
-     *          Pub/Sub Operation
-     */
-    protected void retryOrFailOp(PubSubData pubSubData) {
-        // if we were not able to connect to the host, it could be down
-        ByteString hostString = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
-        if (pubSubData.connectFailedServers != null &&
-            pubSubData.connectFailedServers.contains(hostString)) {
-            // We've already tried to connect to this host before so just
-            // invoke the operationFailed callback.
-            logger.error("Error connecting to host {} more than once so fail the request: {}",
-                         va(host, pubSubData));
-            pubSubData.getCallback().operationFailed(pubSubData.context,
-                new CouldNotConnectException("Could not connect to host: " + host));
-        } else {
-            logger.error("Retry to connect to default hub server again for pubSubData: {}",
-                         pubSubData);
-            // Keep track of this current server that we failed to connect
-            // to but retry the request on the default server host/VIP.
-            if (pubSubData.connectFailedServers == null) {
-                pubSubData.connectFailedServers = new LinkedList<ByteString>();
-            }
-            pubSubData.connectFailedServers.add(hostString);
-            channelManager.submitOpToDefaultServer(pubSubData);
-        }
-    }
-
-    private void onChannelConnected(ChannelFuture future) {
-        Queue<PubSubData> oldPendingOps;
-        synchronized (this) {
-            // if the channel is closed by client, do nothing
-            if (closed) {
-                future.getChannel().close();
-                return;
-            }
-            state = State.CONNECTED;
-            channel = future.getChannel();
-            host = NetUtils.getHostFromChannel(channel);
-            oldPendingOps = pendingOps;
-            pendingOps = new ArrayDeque<PubSubData>();
-        }
-        for (PubSubData op : oldPendingOps) {
-            executeOpAfterConnected(op);
-        }
-    }
-
-    private void onChannelConnectFailure() {
-        Queue<PubSubData> oldPendingOps;
-        synchronized (this) {
-            state = State.DISCONNECTED;
-            channel = null;
-            oldPendingOps = pendingOps;
-            pendingOps = new ArrayDeque<PubSubData>();
-        }
-        for (PubSubData op : oldPendingOps) {
-            retryOrFailOp(op);
-        }
-    }
-
-    private void connect() {
-        synchronized (this) {
-            if (State.CONNECTING == state ||
-                State.CONNECTED == state) {
-                return;
-            }
-            state = State.CONNECTING;
-        }
-        // Start the connection attempt to the input server host.
-        ChannelFuture future = connect(host, pipelineFactory);
-        future.addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                // If the channel has been closed, there is no need to proceed with any
-                // callback logic here.
-                if (closed) {
-                    future.getChannel().close();
-                    return;
-                }
-
-                if (!future.isSuccess()) {
-                    logger.error("Error connecting to host {}.", host);
-                    future.getChannel().close();
-
-                    // if we were not able to connect to the host, it could be down.
-                    onChannelConnectFailure();
-                    return;
-                }
-                logger.debug("Connected to server {}.", host);
-                // Now that we have connected successfully to the server, execute all queueing
-                // requests.
-                onChannelConnected(future);
-            }
-
-        });
-    }
-
-    /**
-     * This is a helper method to do the connect attempt to the server given the
-     * inputted host/port. This can be used to connect to the default server
-     * host/port which is the VIP. That will pick a server in the cluster at
-     * random to connect to for the initial PubSub attempt (with redirect logic
-     * being done at the server side). Additionally, this could be called after
-     * the client makes an initial PubSub attempt at a server, and is redirected
-     * to the one that is responsible for the topic. Once the connect to the
-     * server is done, we will perform the corresponding PubSub write on that
-     * channel.
-     *
-     * @param serverHost
-     *            Input server host to connect to of type InetSocketAddress
-     * @param pipelineFactory
-     *            PipelineFactory to create response handler to handle responses from
-     *            underlying channel.
-     */
-    protected ChannelFuture connect(InetSocketAddress serverHost,
-                                    ClientChannelPipelineFactory pipelineFactory) {
-        logger.debug("Connecting to host {} ...", serverHost);
-        // Set up the ClientBootStrap so we can create a new Channel connection
-        // to the server.
-        ClientBootstrap bootstrap = new ClientBootstrap(channelManager.getChannelFactory());
-        bootstrap.setPipelineFactory(pipelineFactory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("keepAlive", true);
-
-        // Start the connection attempt to the input server host.
-        return bootstrap.connect(serverHost);
-    }
-
-    @Override
-    public void close(boolean wait) {
-        synchronized (this) {
-            if (closed) {
-                return;
-            }
-            closed = true;
-        }
-        if (null == channel) {
-            return;
-        }
-        try {
-            getHChannelHandlerFromChannel(channel).closeExplicitly();
-        } catch (NoResponseHandlerException nrhe) {
-            logger.warn("No channel handler found for channel {} when closing it.",
-                        channel);
-        }
-        if (wait) {
-            channel.close().awaitUninterruptibly();
-        } else {
-            channel.close();
-        }
-        channel = null;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("[HChannel: host - ").append(host)
-          .append(", channel - ").append(channel)
-          .append(", pending reqs - ").append(pendingOps.size())
-          .append(", closed - ").append(closed).append("]");
-        return sb.toString();
-    }
-
-    @Override
-    public void close() {
-        close(false);
-    }
-
-    /**
-     * Helper static method to get the ResponseHandler instance from a Channel
-     * via the ChannelPipeline it is associated with. The assumption is that the
-     * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
-     *
-     * @param channel
-     *            Channel we are retrieving the ResponseHandler instance for
-     * @return ResponseHandler Instance tied to the Channel's Pipeline
-     */
-    public static HChannelHandler getHChannelHandlerFromChannel(Channel channel)
-    throws NoResponseHandlerException {
-        if (null == channel) {
-            throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler");
-        }
-
-        HChannelHandler handler = (HChannelHandler) channel.getPipeline().getLast();
-        if (null == handler) {
-            throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline.");
-        }
-        return handler;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
deleted file mode 100644
index a91bbf8..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.client.handlers.PublishResponseHandler;
-import org.apache.hedwig.client.handlers.UnsubscribeResponseHandler;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-public class NonSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
-
-    public NonSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
-                                                 AbstractHChannelManager channelManager) {
-        super(cfg, channelManager);
-    }
-
-    @Override
-    protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
-        Map<OperationType, AbstractResponseHandler> handlers =
-            new HashMap<OperationType, AbstractResponseHandler>();
-        handlers.put(OperationType.PUBLISH,
-                     new PublishResponseHandler(cfg, channelManager));
-        handlers.put(OperationType.UNSUBSCRIBE,
-                     new UnsubscribeResponseHandler(cfg, channelManager));
-        return handlers;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
deleted file mode 100644
index 17d2401..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ResubscribeException;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-import static org.apache.hedwig.util.VarArgs.va;
-
-/**
- * This class is used when a Subscribe channel gets disconnected and we attempt
- * to resubmit subscribe requests existed in that channel. Once the resubscribe
- * the topic is completed, we need to restart delivery for that topic.
- */
-class ResubscribeCallback implements Callback<ResponseBody> {
-
-    private static final Logger logger = LoggerFactory.getLogger(ResubscribeCallback.class);
-
-    // Private member variables
-    private final TopicSubscriber origTopicSubscriber;
-    private final PubSubData origSubData;
-    private final AbstractHChannelManager channelManager;
-    private final long retryWaitTime;
-
-    // Constructor
-    ResubscribeCallback(TopicSubscriber origTopicSubscriber,
-                        PubSubData origSubData,
-                        AbstractHChannelManager channelManager,
-                        long retryWaitTime) {
-        this.origTopicSubscriber = origTopicSubscriber;
-        this.origSubData = origSubData;
-        this.channelManager = channelManager;
-        this.retryWaitTime = retryWaitTime;
-    }
-
-    @Override
-    public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
-        if (logger.isDebugEnabled())
-            logger.debug("Resubscribe succeeded for origSubData: " + origSubData);
-        // Now we want to restart delivery for the subscription channel only
-        // if delivery was started at the time the original subscribe channel
-        // was disconnected.
-        try {
-            channelManager.restartDelivery(origTopicSubscriber);
-        } catch (ClientNotSubscribedException e) {
-            // This exception should never be thrown here but just in case,
-            // log an error and just keep retrying the subscribe request.
-            logger.error("Subscribe was successful but error starting delivery for {} : {}",
-                         va(origTopicSubscriber, e.getMessage()));
-            retrySubscribeRequest();
-        } catch (AlreadyStartDeliveryException asde) {
-            // should not reach here
-        }
-    }
-
-    @Override
-    public void operationFailed(Object ctx, PubSubException exception) {
-        if (exception instanceof ResubscribeException) {
-            // it might be caused by closesub when resubscribing.
-            // so we don't need to retry resubscribe again
-            logger.warn("Failed to resubscribe {} : but it is caused by closesub when resubscribing. "
-                        + "so we don't need to retry subscribe again.", origSubData);
-        }
-        // If the resubscribe fails, just keep retrying the subscribe
-        // request. There isn't a way to flag to the application layer that
-        // a topic subscription has failed. So instead, we'll just keep
-        // retrying in the background until success.
-        logger.error("Resubscribe failed with error: " + exception.getMessage());
-        // we don't retry subscribe request is channel manager is closing
-        // otherwise it might overflow the stack.
-        if (!channelManager.isClosed()) {
-            retrySubscribeRequest();
-        }
-    }
-
-    private void retrySubscribeRequest() {
-        if (channelManager.isClosed()) {
-            return;
-        }
-        origSubData.clearServersList();
-        logger.debug("Resubmit subscribe request for {} in {} ms later.",
-                     va(origTopicSubscriber, retryWaitTime));
-        channelManager.submitOpAfterDelay(origSubData, retryWaitTime);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java
deleted file mode 100644
index c7a71cb..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class WriteCallback implements ChannelFutureListener {
-
-    private static final Logger logger = LoggerFactory.getLogger(WriteCallback.class);
-
-    // Private member variables
-    private PubSubData pubSubData;
-    private final HChannelManager channelManager;
-
-    // Constructor
-    public WriteCallback(PubSubData pubSubData,
-                         HChannelManager channelManager) {
-        super();
-        this.pubSubData = pubSubData;
-        this.channelManager = channelManager;
-    }
-
-    public void operationComplete(ChannelFuture future) throws Exception {
-        // If the client has stopped, there is no need to proceed
-        // with any callback logic here.
-        if (channelManager.isClosed()) {
-            future.getChannel().close();
-            return;
-        }
-
-        // When the write operation to the server is done, we just need to check
-        // if it was successful or not.
-        InetSocketAddress host = NetUtils.getHostFromChannel(future.getChannel());
-        if (!future.isSuccess()) {
-            logger.error("Error writing on channel to host: {}", host);
-            // On a write failure for a PubSubRequest, we also want to remove
-            // the saved txnId to PubSubData in the ResponseHandler. These
-            // requests will not receive an ack response from the server
-            // so there is no point storing that information there anymore.
-            try {
-                HChannelHandler channelHandler = 
-                    HChannelImpl.getHChannelHandlerFromChannel(future.getChannel());
-                channelHandler.removeTxn(pubSubData.txnId);
-                channelHandler.closeExplicitly();
-            } catch (NoResponseHandlerException e) {
-                // We just couldn't remove the transaction ID's mapping.
-                // The handler was null, so this has been reset anyway.
-                logger.warn("Could not find response handler to remove txnId mapping to pubsub data. Ignoring.");
-            }
-
-            future.getChannel().close();
-
-            // If we were not able to write on the channel to the server host,
-            // the host could have died or something is wrong with the channel
-            // connection where we can connect to the host, but not write to it.
-            ByteString hostString = (host == null) ? null : ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
-            if (pubSubData.writeFailedServers != null && pubSubData.writeFailedServers.contains(hostString)) {
-                // We've already tried to write to this server previously and
-                // failed, so invoke the operationFailed callback.
-                logger.error("Error writing to host more than once so just invoke the operationFailed callback!");
-                pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
-                                                        "Error while writing message to server: " + hostString));
-            } else {
-                logger.debug("Try to send the PubSubRequest again to the default server host/VIP for pubSubData: {}",
-                    pubSubData);
-                // Keep track of this current server that we failed to write to
-                // but retry the request on the default server host/VIP.
-                if (pubSubData.writeFailedServers == null)
-                    pubSubData.writeFailedServers = new LinkedList<ByteString>();
-                pubSubData.writeFailedServers.add(hostString);
-                channelManager.submitOpToDefaultServer(pubSubData);
-            }
-        } else {
-            // Now that the write to the server is done, we have to wait for it
-            // to respond. The ResponseHandler will take care of the ack
-            // response from the server before we can determine if the async
-            // PubSub call has really completed successfully or not.
-            logger.debug("Successfully wrote to host: {} for pubSubData: {}", host, pubSubData);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
deleted file mode 100644
index caa0734..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.multiplex;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.client.netty.CleanupChannelMap;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
-import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
-import org.apache.hedwig.client.netty.impl.HChannelHandler;
-import org.apache.hedwig.client.netty.impl.HChannelImpl;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-
-import static org.apache.hedwig.util.VarArgs.va;
-
-
-/**
- * Multiplex HChannel Manager which establish a connection for multi subscriptions.
- */
-public class MultiplexHChannelManager extends AbstractHChannelManager {
-
-    static final Logger logger = LoggerFactory.getLogger(MultiplexHChannelManager.class);
-
-    // Find which HChannel that a given TopicSubscriber used.
-    protected final CleanupChannelMap<InetSocketAddress> subscriptionChannels;
-
-    // A index map for each topic subscriber is served by which subscription channel
-    protected final CleanupChannelMap<TopicSubscriber> sub2Channels;
-
-    // Concurrent Map to store Message handler for each topic + sub id combination.
-    // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
-    // user set when connection is recovered
-    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler
-        = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
-
-    // PipelineFactory to create subscription netty channels to the appropriate server
-    private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;
-
-    public MultiplexHChannelManager(ClientConfiguration cfg,
-                                    ChannelFactory socketFactory) {
-        super(cfg, socketFactory);
-        subscriptionChannels = new CleanupChannelMap<InetSocketAddress>();
-        sub2Channels = new CleanupChannelMap<TopicSubscriber>();
-        subscriptionChannelPipelineFactory =
-            new MultiplexSubscriptionChannelPipelineFactory(cfg, this);
-    }
-
-    @Override
-    protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
-        return subscriptionChannelPipelineFactory;
-    }
-
-    @Override
-    protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
-        // store the channel connected to target host for future usage
-        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
-        HChannel newHChannel = new HChannelImpl(host, channel, this,
-                                                getSubscriptionChannelPipelineFactory());
-        return storeSubscriptionChannel(host, newHChannel);
-    }
-
-    @Override
-    protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
-        HChannel newHChannel = new HChannelImpl(host, this,
-                                                getSubscriptionChannelPipelineFactory());
-        return storeSubscriptionChannel(host, newHChannel);
-    }
-
-    private HChannel storeSubscriptionChannel(InetSocketAddress host,
-                                              HChannel newHChannel) {
-        // here, we guarantee there is only one channel used to communicate with target
-        // host.
-        return subscriptionChannels.addChannel(host, newHChannel);
-    }
-
-    @Override
-    protected HChannel getSubscriptionChannel(InetSocketAddress host) {
-        return subscriptionChannels.getChannel(host);
-    }
-
-    protected HChannel getSubscriptionChannel(TopicSubscriber subscriber) {
-        InetSocketAddress host = topic2Host.get(subscriber.getTopic());
-        if (null == host) {
-            // we don't know where is the owner of the topic
-            return null;
-        } else {
-            return getSubscriptionChannel(host);
-        }
-    }
-
-    @Override
-    protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
-        InetSocketAddress host = topic2Host.get(subscriber.getTopic());
-        if (null == host) {
-            // we don't know where is the topic
-            return null;
-        } else {
-            // we had know which server owned the topic
-            HChannel channel = getSubscriptionChannel(host);
-            if (null == channel) {
-                // create a channel to connect to sepcified host
-                channel = createAndStoreSubscriptionChannel(host);
-            }
-            return channel;
-        }
-    }
-
-    @Override
-    protected void onSubscriptionChannelDisconnected(InetSocketAddress host,
-                                                     Channel channel) {
-        HChannel hChannel = subscriptionChannels.getChannel(host);
-        if (null == hChannel) {
-            return;
-        }
-        Channel underlyingChannel = hChannel.getChannel();
-        if (null == underlyingChannel ||
-            !underlyingChannel.equals(channel)) {
-            return;
-        }
-        logger.info("Subscription Channel {} disconnected from {}.",
-                    va(channel, host));
-        // remove existed channel
-        if (subscriptionChannels.removeChannel(host, hChannel)) {
-            try {
-                HChannelHandler channelHandler =
-                    HChannelImpl.getHChannelHandlerFromChannel(channel);
-                channelHandler.getSubscribeResponseHandler()
-                              .onChannelDisconnected(host, channel);
-            } catch (NoResponseHandlerException nrhe) {
-                logger.warn("No Channel Handler found for channel {} when it disconnected.",
-                            channel);
-            }
-        }
-    }
-
-    @Override
-    public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
-        HChannel hChannel = getSubscriptionChannel(topicSubscriber);
-        if (null == hChannel) {
-            return null;
-        }
-        Channel channel = hChannel.getChannel();
-        if (null == channel) {
-            return null;
-        }
-        try {
-            HChannelHandler channelHandler =
-                HChannelImpl.getHChannelHandlerFromChannel(channel);
-            return channelHandler.getSubscribeResponseHandler();
-        } catch (NoResponseHandlerException nrhe) {
-            logger.warn("No Channel Handler found for channel {}, topic subscriber {}.",
-                        channel, topicSubscriber);
-            return null;
-        }
-    }
-
-    @Override
-    public void startDelivery(TopicSubscriber topicSubscriber,
-                              MessageHandler messageHandler)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        startDelivery(topicSubscriber, messageHandler, false);
-    }
-
-    @Override
-    protected void restartDelivery(TopicSubscriber topicSubscriber)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        startDelivery(topicSubscriber, null, true);
-    }
-
-    private void startDelivery(TopicSubscriber topicSubscriber,
-                               MessageHandler messageHandler,
-                               boolean restart)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        // Make sure we know about this topic subscription on the client side
-        // exists. The assumption is that the client should have in memory the
-        // Channel created for the TopicSubscriber once the server has sent
-        // an ack response to the initial subscribe request.
-        SubscribeResponseHandler subscribeResponseHandler =
-            getSubscribeResponseHandler(topicSubscriber);
-        if (null == subscribeResponseHandler ||
-            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
-            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
-            throw new ClientNotSubscribedException("Client is not yet subscribed to "
-                                                   + topicSubscriber);
-        }
-
-        MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
-        if (restart) {
-            // restart using existing msg handler 
-            messageHandler = existedMsgHandler;
-        } else {
-            // some has started delivery but not stop it
-            if (null != existedMsgHandler) {
-                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
-            }
-            if (messageHandler != null) {
-                if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
-                    throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
-                }
-            }
-        }
-
-        // tell subscribe response handler to start delivering messages for topicSubscriber
-        subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
-    }
-
-    public void stopDelivery(TopicSubscriber topicSubscriber)
-    throws ClientNotSubscribedException {
-        // Make sure we know that this topic subscription on the client side
-        // exists. The assumption is that the client should have in memory the
-        // Channel created for the TopicSubscriber once the server has sent
-        // an ack response to the initial subscribe request.
-        SubscribeResponseHandler subscribeResponseHandler =
-            getSubscribeResponseHandler(topicSubscriber);
-        if (null == subscribeResponseHandler ||
-            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
-            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
-            throw new ClientNotSubscribedException("Client is not yet subscribed to "
-                                                   + topicSubscriber);
-        }
-
-        // tell subscribe response handler to stop delivering messages for a given topic subscriber
-        topicSubscriber2MessageHandler.remove(topicSubscriber);
-        subscribeResponseHandler.stopDelivery(topicSubscriber);
-    }
-                            
-
-    @Override
-    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
-                                       final Callback<ResponseBody> callback,
-                                       final Object context) {
-        SubscribeResponseHandler subscribeResponseHandler =
-            getSubscribeResponseHandler(topicSubscriber);
-        if (null == subscribeResponseHandler ||
-            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
-            logger.warn("Trying to close a subscription when we don't have a subscription channel cached for {}",
-                        topicSubscriber);
-            callback.operationFinished(context, (ResponseBody)null);
-            return;
-        }
-        subscribeResponseHandler.asyncCloseSubscription(topicSubscriber, callback, context);
-    }
-
-    @Override
-    protected void checkTimeoutRequestsOnSubscriptionChannels() {
-        // timeout task may be started before constructing subscriptionChannels
-        if (null == subscriptionChannels) {
-            return;
-        }
-        for (HChannel channel : subscriptionChannels.getChannels()) {
-            try {
-                HChannelHandler channelHandler =
-                    HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
-                channelHandler.checkTimeoutRequests();
-            } catch (NoResponseHandlerException nrhe) {
-                continue;
-            }
-        }
-    }
-
-    @Override
-    protected void closeSubscriptionChannels() {
-        subscriptionChannels.close();
-    }
-
-    protected Either<Boolean, HChannel> storeSubscriptionChannel(
-        TopicSubscriber topicSubscriber, PubSubData txn, HChannel channel) {
-        boolean replaced = sub2Channels.replaceChannel(
-            topicSubscriber, txn.getOriginalChannelForResubscribe(), channel);
-        if (replaced) {
-            return Either.of(replaced, channel);
-        } else {
-            return Either.of(replaced, null);
-        }
-    }
-
-    protected boolean removeSubscriptionChannel(
-        TopicSubscriber topicSubscriber, HChannel channel) {
-        return sub2Channels.removeChannel(topicSubscriber, channel);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
deleted file mode 100644
index 9b8ebe0..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.multiplex;
-
-import java.net.InetSocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
-import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-import static org.apache.hedwig.util.VarArgs.va;
-
-public class MultiplexSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
-
-    private static final Logger logger =
-        LoggerFactory.getLogger(MultiplexSubscribeResponseHandler.class);
-
-    // the underlying subscription channel
-    volatile HChannel hChannel;
-    private final MultiplexHChannelManager sChannelManager;
-
-    protected MultiplexSubscribeResponseHandler(ClientConfiguration cfg,
-                                                HChannelManager channelManager) {
-        super(cfg, channelManager);
-        sChannelManager = (MultiplexHChannelManager) channelManager;
-    }
-
-    @Override
-    public void handleResponse(PubSubResponse response, PubSubData pubSubData,
-                               Channel channel) throws Exception {
-        if (null == hChannel) {
-            InetSocketAddress host = NetUtils.getHostFromChannel(channel);
-            hChannel = sChannelManager.getSubscriptionChannel(host);
-            if (null == hChannel ||
-                !channel.equals(hChannel.getChannel())) {
-                PubSubException pse =
-                    new UnexpectedConditionException("Failed to get subscription channel of " + host);
-                pubSubData.getCallback().operationFailed(pubSubData.context, pse);
-                return;
-            }
-        }
-        super.handleResponse(response, pubSubData, channel);
-    }
-
-    @Override
-    protected Either<StatusCode, HChannel> handleSuccessResponse(
-        TopicSubscriber ts, PubSubData pubSubData, Channel channel) {
-        // Store the mapping for the TopicSubscriber to the Channel.
-        // This is so we can control the starting and stopping of
-        // message deliveries from the server on that Channel. Store
-        // this only on a successful ack response from the server.
-        Either<Boolean, HChannel> result =
-            sChannelManager.storeSubscriptionChannel(ts, pubSubData, hChannel);
-        if (result.left()) {
-            return Either.of(StatusCode.SUCCESS, result.right());
-        } else {
-            StatusCode code;
-            if (pubSubData.isResubscribeRequest()) {
-                code = StatusCode.RESUBSCRIBE_EXCEPTION;
-            } else {
-                code = StatusCode.CLIENT_ALREADY_SUBSCRIBED;
-            }
-            return Either.of(code, null);
-        }
-    }
-
-    @Override
-    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
-                                       final Callback<ResponseBody> callback,
-                                       final Object context) {
-        final ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
-        if (null == ss || null == hChannel) {
-            logger.debug("No subscription {} found when closing its subscription from {}.",
-                         va(topicSubscriber, hChannel));
-            callback.operationFinished(context, (ResponseBody)null);
-            return;
-        }
-        Callback<ResponseBody> closeCb = new Callback<ResponseBody>() {
-            @Override
-            public void operationFinished(Object ctx, ResponseBody respBody) {
-                removeSubscription(topicSubscriber, ss);
-                sChannelManager.removeSubscriptionChannel(topicSubscriber, hChannel);
-                callback.operationFinished(context, null);
-            }
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                callback.operationFailed(context, exception);
-            }
-        };
-        PubSubData closeOp = new PubSubData(topicSubscriber.getTopic(), null,
-                                            topicSubscriber.getSubscriberId(),
-                                            OperationType.CLOSESUBSCRIPTION,
-                                            null, closeCb, context);
-        hChannel.submitOp(closeOp);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
deleted file mode 100644
index c43108a..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.multiplex;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler;
-import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
-import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
-import org.apache.hedwig.client.netty.impl.HChannelHandler;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-public class MultiplexSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
-
-    public MultiplexSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
-                                                       MultiplexHChannelManager channelManager) {
-        super(cfg, channelManager);
-    }
-
-    @Override
-    protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
-        Map<OperationType, AbstractResponseHandler> handlers =
-            new HashMap<OperationType, AbstractResponseHandler>();
-        handlers.put(OperationType.SUBSCRIBE,
-                     new MultiplexSubscribeResponseHandler(cfg, channelManager));
-        handlers.put(OperationType.CLOSESUBSCRIPTION,
-                     new CloseSubscriptionResponseHandler(cfg, channelManager));
-        return handlers;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
deleted file mode 100644
index fb0a7d9..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.simple;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.client.netty.CleanupChannelMap;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
-import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
-import org.apache.hedwig.client.netty.impl.HChannelHandler;
-import org.apache.hedwig.client.netty.impl.HChannelImpl;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-import static org.apache.hedwig.util.VarArgs.va;
-
-/**
- * Simple HChannel Manager which establish a connection for each subscription.
- */
-public class SimpleHChannelManager extends AbstractHChannelManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(SimpleHChannelManager.class);
-
-    // Concurrent Map to store the cached Channel connections on the client side
-    // to a server host for a given Topic + SubscriberId combination. For each
-    // TopicSubscriber, we want a unique Channel connection to the server for
-    // it. We can also get the ResponseHandler tied to the Channel via the
-    // Channel Pipeline.
-    protected final CleanupChannelMap<TopicSubscriber> topicSubscriber2Channel;
-
-    // Concurrent Map to store Message handler for each topic + sub id combination.
-    // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
-    // user set when connection is recovered
-    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler
-        = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
-
-    // PipelineFactory to create subscription netty channels to the appropriate server
-    private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;
-
-    public SimpleHChannelManager(ClientConfiguration cfg,
-                                 ChannelFactory socketFactory) {
-        super(cfg, socketFactory);
-        topicSubscriber2Channel = new CleanupChannelMap<TopicSubscriber>();
-        this.subscriptionChannelPipelineFactory =
-            new SimpleSubscriptionChannelPipelineFactory(cfg, this);
-    }
-
-    @Override
-    public void submitOp(final PubSubData pubSubData) {
-        /**
-         * In the simple hchannel implementation that if a client closes a subscription
-         * and tries to attach to it immediately, it could get a TOPIC_BUSY response. This
-         * is because, a subscription is closed simply by closing the channel, and the hub
-         * side may not have been notified of the channel disconnection event by the time
-         * the new subscription request comes in. To solve this, retry up to 5 times.
-         * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-513}
-         */
-        if (OperationType.SUBSCRIBE.equals(pubSubData.operationType)) {
-            final Callback<ResponseBody> origCb = pubSubData.getCallback();
-            final AtomicInteger retries = new AtomicInteger(5);
-            final Callback<ResponseBody> wrapperCb
-                = new Callback<ResponseBody>() {
-                @Override
-                public void operationFinished(Object ctx,
-                                              ResponseBody resultOfOperation) {
-                    origCb.operationFinished(ctx, resultOfOperation);
-                }
-
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    if (exception instanceof ServiceDownException
-                        && exception.getCause() instanceof TopicBusyException
-                        && retries.decrementAndGet() > 0) {
-                        logger.warn("TOPIC_DOWN from server using simple channel scheme."
-                                    + "This could be due to the channel disconnection from a close"
-                                    + " not having been triggered on the server side. Retrying");
-                        SimpleHChannelManager.super.submitOp(pubSubData);
-                        return;
-                    }
-                    origCb.operationFailed(ctx, exception);
-                }
-            };
-            pubSubData.setCallback(wrapperCb);
-        }
-        super.submitOp(pubSubData);
-    }
-
-    @Override
-    protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
-        return subscriptionChannelPipelineFactory;
-    }
-
-    @Override
-    protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
-        // for simple channel, we don't store subscription channel now
-        // we store it until we received success response
-        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
-        return new HChannelImpl(host, channel, this,
-                                getSubscriptionChannelPipelineFactory());
-    }
-
-    @Override
-    protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
-        // for simple channel, we don't store subscription channel now
-        // we store it until we received success response
-        return new HChannelImpl(host, this,
-                                getSubscriptionChannelPipelineFactory());
-    }
-
-    protected Either<Boolean, HChannel> storeSubscriptionChannel(
-        TopicSubscriber topicSubscriber, PubSubData txn, Channel channel) {
-        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
-        HChannel newHChannel = new HChannelImpl(host, channel, this,
-                                                getSubscriptionChannelPipelineFactory());
-        boolean replaced = topicSubscriber2Channel.replaceChannel(
-            topicSubscriber, txn.getOriginalChannelForResubscribe(), newHChannel);
-        if (replaced) {
-            return Either.of(replaced, newHChannel);
-        } else {
-            return Either.of(replaced, null);
-        }
-    }
-
-    @Override
-    protected HChannel getSubscriptionChannel(InetSocketAddress host) {
-        return null;
-    }
-
-    @Override
-    protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
-        HChannel channel = topicSubscriber2Channel.getChannel(subscriber);
-        if (null != channel) {
-            // there is no channel established for this subscription
-            return channel;
-        } else {
-            InetSocketAddress host = topic2Host.get(subscriber.getTopic());
-            if (null == host) {
-                return null;
-            } else {
-                channel = getSubscriptionChannel(host);
-                if (null == channel) {
-                    channel = createAndStoreSubscriptionChannel(host);
-                }
-                return channel;
-            }
-        }
-    }
-
-    @Override
-    protected void onSubscriptionChannelDisconnected(InetSocketAddress host,
-                                                     Channel channel) {
-        logger.info("Subscription Channel {} disconnected from {}.",
-                    va(channel, host));
-        try {
-            // get hchannel handler
-            HChannelHandler channelHandler =
-                HChannelImpl.getHChannelHandlerFromChannel(channel);
-            channelHandler.getSubscribeResponseHandler()
-                          .onChannelDisconnected(host, channel);
-        } catch (NoResponseHandlerException nrhe) {
-            logger.warn("No Channel Handler found for channel {} when it disconnected.",
-                        channel);
-        }
-    }
-
-    @Override
-    public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
-        HChannel hChannel = topicSubscriber2Channel.getChannel(topicSubscriber);
-        if (null == hChannel) {
-            return null;
-        }
-        Channel channel = hChannel.getChannel();
-        if (null == channel) {
-            return null;
-        }
-        try {
-            HChannelHandler channelHandler =
-                HChannelImpl.getHChannelHandlerFromChannel(channel);
-            return channelHandler.getSubscribeResponseHandler();
-        } catch (NoResponseHandlerException nrhe) {
-            logger.warn("No Channel Handler found for channel {}, topic subscriber {}.",
-                        channel, topicSubscriber);
-            return null;
-        }
-
-    }
-
-    @Override
-    public void startDelivery(TopicSubscriber topicSubscriber,
-                              MessageHandler messageHandler)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        startDelivery(topicSubscriber, messageHandler, false);
-    }
-
-    @Override
-    protected void restartDelivery(TopicSubscriber topicSubscriber)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        startDelivery(topicSubscriber, null, true);
-    }
-
-    private void startDelivery(TopicSubscriber topicSubscriber,
-                               MessageHandler messageHandler, boolean restart)
-        throws ClientNotSubscribedException, AlreadyStartDeliveryException {
-        // Make sure we know about this topic subscription on the client side
-        // exists. The assumption is that the client should have in memory the
-        // Channel created for the TopicSubscriber once the server has sent
-        // an ack response to the initial subscribe request.
-        SubscribeResponseHandler subscribeResponseHandler =
-            getSubscribeResponseHandler(topicSubscriber);
-        if (null == subscribeResponseHandler ||
-            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
-            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
-            throw new ClientNotSubscribedException("Client is not yet subscribed to "
-                                                   + topicSubscriber);
-        }
-
-        MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
-        if (restart) {
-            // restart using existing msg handler 
-            messageHandler = existedMsgHandler;
-        } else {
-            // some has started delivery but not stop it
-            if (null != existedMsgHandler) {
-                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
-            }
-            if (messageHandler != null) {
-                if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
-                    throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
-                }
-            }
-        }
-
-        // tell subscribe response handler to start delivering messages for topicSubscriber
-        subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
-    }
-
-    public void stopDelivery(TopicSubscriber topicSubscriber)
-    throws ClientNotSubscribedException {
-        // Make sure we know that this topic subscription on the client side
-        // exists. The assumption is that the client should have in memory the
-        // Channel created for the TopicSubscriber once the server has sent
-        // an ack response to the initial subscribe request.
-        SubscribeResponseHandler subscribeResponseHandler =
-            getSubscribeResponseHandler(topicSubscriber);
-        if (null == subscribeResponseHandler ||
-            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
-            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
-            throw new ClientNotSubscribedException("Client is not yet subscribed to "
-                                                   + topicSubscriber);
-        }
-
-        // tell subscribe response handler to stop delivering messages for a given topic subscriber
-        topicSubscriber2MessageHandler.remove(topicSubscriber);
-        subscribeResponseHandler.stopDelivery(topicSubscriber);
-    }
-                            
-
-    @Override
-    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
-                                       final Callback<ResponseBody> callback,
-                                       final Object context) {
-        HChannel hChannel = topicSubscriber2Channel.removeChannel(topicSubscriber);
-        if (null == hChannel) {
-            logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for {}",
-                        topicSubscriber);
-            callback.operationFinished(context, (ResponseBody)null);
-            return;
-        }
-
-        Channel channel = hChannel.getChannel();
-        if (null == channel) {
-            callback.operationFinished(context, (ResponseBody)null);
-            return;
-        }
-
-        try {
-            HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
-        } catch (NoResponseHandlerException nrhe) {
-            logger.warn("No Channel Handler found when closing {}'s channel {}.",
-                        channel, topicSubscriber);
-        }
-        ChannelFuture future = channel.close();
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (!future.isSuccess()) {
-                    logger.error("Failed to close the subscription channel for {}",
-                                 topicSubscriber);
-                    callback.operationFailed(context, new ServiceDownException(
-                        "Failed to close the subscription channel for " + topicSubscriber));
-                } else {
-                    callback.operationFinished(context, (ResponseBody)null);
-                }
-            }
-        });
-    }
-
-    @Override
-    protected void checkTimeoutRequestsOnSubscriptionChannels() {
-        // timeout task may be started before constructing topicSubscriber2Channel
-        if (null == topicSubscriber2Channel) {
-            return;
-        }
-        for (HChannel channel : topicSubscriber2Channel.getChannels()) {
-            try {
-                HChannelHandler channelHandler =
-                    HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
-                channelHandler.checkTimeoutRequests();
-            } catch (NoResponseHandlerException nrhe) {
-                continue;
-            }
-        }
-    }
-
-    @Override
-    protected void closeSubscriptionChannels() {
-        topicSubscriber2Channel.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
deleted file mode 100644
index ee5bd90..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.simple;
-
-
-import java.util.Set;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
-import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
-import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
-import org.apache.hedwig.client.netty.impl.HChannelImpl;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-
-public class SimpleSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
-
-    private static final Logger logger = LoggerFactory.getLogger(SimpleSubscribeResponseHandler.class);
-
-    /**
-     * Simple Active Subscriber enabling client-side throttling.
-     */
-    static class SimpleActiveSubscriber extends ActiveSubscriber {
-
-        // Set to store all of the outstanding subscribed messages that are pending
-        // to be consumed by the client app's MessageHandler. If this ever grows too
-        // big (e.g. problem at the client end for message consumption), we can
-        // throttle things by temporarily setting the Subscribe Netty Channel
-        // to not be readable. When the Set has shrunk sufficiently, we can turn the
-        // channel back on to read new messages.
-        private final Set<Message> outstandingMsgSet;
-
-        public SimpleActiveSubscriber(ClientConfiguration cfg,
-                                      AbstractHChannelManager channelManager,
-                                      TopicSubscriber ts, PubSubData op,
-                                      SubscriptionPreferences preferences,
-                                      Channel channel,
-                                      HChannel hChannel) {
-            super(cfg, channelManager, ts, op, preferences, channel, hChannel);
-            outstandingMsgSet = Collections.newSetFromMap(
-                    new ConcurrentHashMap<Message, Boolean>(
-                            cfg.getMaximumOutstandingMessages(), 1.0f));
-        }
-
-        @Override
-        protected void unsafeDeliverMessage(Message message) {
-            // Add this "pending to be consumed" message to the outstandingMsgSet.
-            outstandingMsgSet.add(message);
-            // Check if we've exceeded the max size for the outstanding message set.
-            if (outstandingMsgSet.size() >= cfg.getMaximumOutstandingMessages() &&
-                channel.isReadable()) {
-                // Too many outstanding messages so throttle it by setting the Netty
-                // Channel to not be readable.
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Too many outstanding messages ({}) so throttling the subscribe netty Channel",
-                                 outstandingMsgSet.size());
-                }
-                channel.setReadable(false);
-            }
-            super.unsafeDeliverMessage(message);
-        }
-
-        @Override
-        public synchronized void messageConsumed(Message message) {
-            super.messageConsumed(message);
-            // Remove this consumed message from the outstanding Message Set.
-            outstandingMsgSet.remove(message);
-            // Check if we throttled message consumption previously when the
-            // outstanding message limit was reached. For now, only turn the
-            // delivery back on if there are no more outstanding messages to
-            // consume. We could make this a configurable parameter if needed.
-            if (!channel.isReadable() && outstandingMsgSet.size() == 0) {
-                if (logger.isDebugEnabled())
-                    logger.debug("Message consumption has caught up so okay to turn off"
-                                 + " throttling of messages on the subscribe channel for {}",
-                                 topicSubscriber);
-                channel.setReadable(true);
-            }
-        }
-
-        @Override
-        public synchronized void startDelivery(MessageHandler messageHandler)
-        throws AlreadyStartDeliveryException, ClientNotSubscribedException {
-            super.startDelivery(messageHandler);
-            // Now make the TopicSubscriber Channel readable (it is set to not be
-            // readable when the initial subscription is done). Note that this is an
-            // asynchronous call. If this fails (not likely), the futureListener
-            // will just log an error message for now.
-            ChannelFuture future = channel.setReadable(true);
-            future.addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (!future.isSuccess()) {
-                        logger.error("Unable to make subscriber Channel readable in startDelivery call for {}",
-                                     topicSubscriber);
-                    }
-                }
-            });
-        }
-
-        @Override
-        public synchronized void stopDelivery() {
-            super.stopDelivery();
-            // Now make the TopicSubscriber channel not-readable. This will buffer
-            // up messages if any are sent from the server. Note that this is an
-            // asynchronous call. If this fails (not likely), the futureListener
-            // will just log an error message for now.
-            ChannelFuture future = channel.setReadable(false);
-            future.addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (!future.isSuccess()) {
-                        logger.error("Unable to make subscriber Channel not readable in stopDelivery call for {}",
-                                     topicSubscriber);
-                    }
-                }
-            });
-        }
-
-    }
-
-    // Track which subscriber is alive in this response handler
-    // Which is used for backward compat, since old version hub
-    // server doesn't carry (topic, subscriberid) in each message.
-    private volatile TopicSubscriber origTopicSubscriber;
-    private volatile ActiveSubscriber origActiveSubscriber;
-
-    private SimpleHChannelManager sChannelManager;
-
-    protected SimpleSubscribeResponseHandler(ClientConfiguration cfg,
-                                             HChannelManager channelManager) {
-        super(cfg, channelManager);
-        sChannelManager = (SimpleHChannelManager) channelManager;
-    }
-
-    @Override
-    protected ActiveSubscriber createActiveSubscriber(
-        ClientConfiguration cfg, AbstractHChannelManager channelManager,
-        TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences,
-        Channel channel, HChannel hChannel) {
-        return new SimpleActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel);
-    }
-
-    @Override
-    protected synchronized ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
-        if (null == origTopicSubscriber || !origTopicSubscriber.equals(ts)) {
-            return null;
-        }
-        return origActiveSubscriber;
-    }
-
-    private synchronized ActiveSubscriber getActiveSubscriber() {
-        return origActiveSubscriber;
-    }
-
-    @Override
-    public synchronized boolean hasSubscription(TopicSubscriber ts) {
-        if (null == origTopicSubscriber) {
-            return false;
-        }
-        return origTopicSubscriber.equals(ts);
-    }
-
-    @Override
-    protected synchronized boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
-        if (null != origTopicSubscriber && !origTopicSubscriber.equals(ts)) {
-            return false;
-        }
-        origTopicSubscriber = null;
-        origActiveSubscriber = null;
-        return super.removeSubscription(ts, ss);
-    }
-
-    @Override
-    public void handleResponse(PubSubResponse response, PubSubData pubSubData,
-                               Channel channel) throws Exception {
-        // If this was not a successful response to the Subscribe request, we
-        // won't be using the Netty Channel created so just close it.
-        if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
-            HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
-            channel.close();
-        }
-        super.handleResponse(response, pubSubData, channel);
-    }
-
-    @Override
-    public void handleSubscribeMessage(PubSubResponse response) {
-        Message message = response.getMessage();
-        ActiveSubscriber ss = getActiveSubscriber();
-        if (null == ss) {
-            logger.error("No Subscriber is alive receiving its message {}.",
-                         MessageIdUtils.msgIdToReadableString(message.getMsgId()));
-            return;
-        }
-        ss.handleMessage(message);
-    }
-
-    @Override
-    protected Either<StatusCode, HChannel> handleSuccessResponse(
-        TopicSubscriber ts, PubSubData pubSubData, Channel channel) {
-        // Store the mapping for the TopicSubscriber to the Channel.
-        // This is so we can control the starting and stopping of
-        // message deliveries from the server on that Channel. Store
-        // this only on a successful ack response from the server.
-        Either<Boolean, HChannel> result =
-            sChannelManager.storeSubscriptionChannel(ts, pubSubData, channel);
-        if (result.left()) {
-            return Either.of(StatusCode.SUCCESS, result.right());
-        } else {
-            StatusCode code;
-            if (pubSubData.isResubscribeRequest()) {
-                code = StatusCode.RESUBSCRIBE_EXCEPTION;
-            } else {
-                code = StatusCode.CLIENT_ALREADY_SUBSCRIBED;
-            }
-            return Either.of(code, null);
-        }
-    }
-
-    @Override
-    protected synchronized void postHandleSuccessResponse(
-        TopicSubscriber ts, ActiveSubscriber as) {
-        origTopicSubscriber = ts;
-        origActiveSubscriber = as;
-    }
-
-    @Override
-    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
-                                       final Callback<ResponseBody> callback,
-                                       final Object context) {
-        // nothing to do just clear status
-        // channel manager takes the responsibility to close the channel
-        callback.operationFinished(context, (ResponseBody)null);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
deleted file mode 100644
index d14f053..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.simple;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler;
-import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-public class SimpleSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
-
-    public SimpleSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
-                                                    SimpleHChannelManager channelManager) {
-        super(cfg, channelManager);
-    }
-
-    @Override
-    protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
-        Map<OperationType, AbstractResponseHandler> handlers =
-            new HashMap<OperationType, AbstractResponseHandler>();
-        handlers.put(OperationType.SUBSCRIBE,
-                     new SimpleSubscribeResponseHandler(cfg, channelManager));
-        handlers.put(OperationType.CLOSESUBSCRIPTION,
-                     new CloseSubscriptionResponseHandler(cfg, channelManager));
-        return handlers;
-    }
-
-}