You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:32 UTC
[22/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
deleted file mode 100644
index fd58747..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
-import java.util.LinkedList;
-import java.util.Queue;
-
-import com.google.protobuf.ByteString;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import static org.apache.hedwig.util.VarArgs.va;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide a wrapper over netty channel for Hedwig operations.
- */
-public class HChannelImpl implements HChannel {
-
- private static Logger logger = LoggerFactory.getLogger(HChannelImpl.class);
-
- enum State {
- DISCONNECTED,
- CONNECTING,
- CONNECTED,
- };
-
- InetSocketAddress host;
- final AbstractHChannelManager channelManager;
- final ClientChannelPipelineFactory pipelineFactory;
- volatile Channel channel;
- volatile State state;
-
- // Indicates whether the channel is closed or not.
- volatile boolean closed = false;
- // Queue the pubsub requests when the channel is not connected.
- Queue<PubSubData> pendingOps = new ArrayDeque<PubSubData>();
-
- /**
- * Create a un-established channel with provided target <code>host</code>.
- *
- * @param host
- * Target host address.
- * @param channelManager
- * Channel manager manages the channels.
- */
- protected HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager) {
- this(host, channelManager, null);
- }
-
- public HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager,
- ClientChannelPipelineFactory pipelineFactory) {
- this(host, null, channelManager, pipelineFactory);
- state = State.DISCONNECTED;
- }
-
- /**
- * Create a <code>HChannel</code> with an established netty channel.
- *
- * @param host
- * Target host address.
- * @param channel
- * Established Netty channel.
- * @param channelManager
- * Channel manager manages the channels.
- */
- public HChannelImpl(InetSocketAddress host, Channel channel,
- AbstractHChannelManager channelManager,
- ClientChannelPipelineFactory pipelineFactory) {
- this.host = host;
- this.channel = channel;
- this.channelManager = channelManager;
- this.pipelineFactory = pipelineFactory;
- state = State.CONNECTED;
- }
-
- @Override
- public void submitOp(PubSubData pubSubData) {
- boolean doOpNow = false;
-
- // common case without lock first
- if (null != channel && State.CONNECTED == state) {
- doOpNow = true;
- } else {
- synchronized (this) {
- // check channel & state again under lock
- if (null != channel && State.CONNECTED == state) {
- doOpNow = true;
- } else {
- // if reached here, channel is either null (first connection attempt),
- // or the channel is disconnected. Connection attempt is still in progress,
- // queue up this op. Op will be executed when connection attempt either
- // fails or succeeds
- pendingOps.add(pubSubData);
- }
- }
- if (!doOpNow) {
- // start connection attempt to server
- connect();
- }
- }
- if (doOpNow) {
- executeOpAfterConnected(pubSubData);
- }
- }
-
- /**
- * Execute pub/sub operation after the underlying channel is connected.
- *
- * @param pubSubData
- * Pub/Sub Operation
- */
- private void executeOpAfterConnected(PubSubData pubSubData) {
- PubSubRequest.Builder reqBuilder =
- NetUtils.buildPubSubRequest(channelManager.nextTxnId(), pubSubData);
- writePubSubRequest(pubSubData, reqBuilder.build());
- }
-
- @Override
- public Channel getChannel() {
- return channel;
- }
-
- private void writePubSubRequest(PubSubData pubSubData, PubSubRequest pubSubRequest) {
- if (closed || null == channel || State.CONNECTED != state) {
- retryOrFailOp(pubSubData);
- return;
- }
-
- // Before we do the write, store this information into the
- // ResponseHandler so when the server responds, we know what
- // appropriate Callback Data to invoke for the given txn ID.
- try {
- getHChannelHandlerFromChannel(channel)
- .addTxn(pubSubData.txnId, pubSubData);
- } catch (NoResponseHandlerException nrhe) {
- logger.warn("No Channel Handler found for channel {} when writing request."
- + " It might already disconnect.", channel);
- return;
- }
-
- // Finally, write the pub/sub request through the Channel.
- logger.debug("Writing a {} request to host: {} for pubSubData: {}.",
- va(pubSubData.operationType, host, pubSubData));
- ChannelFuture future = channel.write(pubSubRequest);
- future.addListener(new WriteCallback(pubSubData, channelManager));
- }
-
- /**
- * Re-submit operation to default server or fail it.
- *
- * @param pubSubData
- * Pub/Sub Operation
- */
- protected void retryOrFailOp(PubSubData pubSubData) {
- // if we were not able to connect to the host, it could be down
- ByteString hostString = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
- if (pubSubData.connectFailedServers != null &&
- pubSubData.connectFailedServers.contains(hostString)) {
- // We've already tried to connect to this host before so just
- // invoke the operationFailed callback.
- logger.error("Error connecting to host {} more than once so fail the request: {}",
- va(host, pubSubData));
- pubSubData.getCallback().operationFailed(pubSubData.context,
- new CouldNotConnectException("Could not connect to host: " + host));
- } else {
- logger.error("Retry to connect to default hub server again for pubSubData: {}",
- pubSubData);
- // Keep track of this current server that we failed to connect
- // to but retry the request on the default server host/VIP.
- if (pubSubData.connectFailedServers == null) {
- pubSubData.connectFailedServers = new LinkedList<ByteString>();
- }
- pubSubData.connectFailedServers.add(hostString);
- channelManager.submitOpToDefaultServer(pubSubData);
- }
- }
-
- private void onChannelConnected(ChannelFuture future) {
- Queue<PubSubData> oldPendingOps;
- synchronized (this) {
- // if the channel is closed by client, do nothing
- if (closed) {
- future.getChannel().close();
- return;
- }
- state = State.CONNECTED;
- channel = future.getChannel();
- host = NetUtils.getHostFromChannel(channel);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<PubSubData>();
- }
- for (PubSubData op : oldPendingOps) {
- executeOpAfterConnected(op);
- }
- }
-
- private void onChannelConnectFailure() {
- Queue<PubSubData> oldPendingOps;
- synchronized (this) {
- state = State.DISCONNECTED;
- channel = null;
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<PubSubData>();
- }
- for (PubSubData op : oldPendingOps) {
- retryOrFailOp(op);
- }
- }
-
- private void connect() {
- synchronized (this) {
- if (State.CONNECTING == state ||
- State.CONNECTED == state) {
- return;
- }
- state = State.CONNECTING;
- }
- // Start the connection attempt to the input server host.
- ChannelFuture future = connect(host, pipelineFactory);
- future.addListener(new ChannelFutureListener() {
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- // If the channel has been closed, there is no need to proceed with any
- // callback logic here.
- if (closed) {
- future.getChannel().close();
- return;
- }
-
- if (!future.isSuccess()) {
- logger.error("Error connecting to host {}.", host);
- future.getChannel().close();
-
- // if we were not able to connect to the host, it could be down.
- onChannelConnectFailure();
- return;
- }
- logger.debug("Connected to server {}.", host);
- // Now that we have connected successfully to the server, execute all queueing
- // requests.
- onChannelConnected(future);
- }
-
- });
- }
-
- /**
- * This is a helper method to do the connect attempt to the server given the
- * inputted host/port. This can be used to connect to the default server
- * host/port which is the VIP. That will pick a server in the cluster at
- * random to connect to for the initial PubSub attempt (with redirect logic
- * being done at the server side). Additionally, this could be called after
- * the client makes an initial PubSub attempt at a server, and is redirected
- * to the one that is responsible for the topic. Once the connect to the
- * server is done, we will perform the corresponding PubSub write on that
- * channel.
- *
- * @param serverHost
- * Input server host to connect to of type InetSocketAddress
- * @param pipelineFactory
- * PipelineFactory to create response handler to handle responses from
- * underlying channel.
- */
- protected ChannelFuture connect(InetSocketAddress serverHost,
- ClientChannelPipelineFactory pipelineFactory) {
- logger.debug("Connecting to host {} ...", serverHost);
- // Set up the ClientBootStrap so we can create a new Channel connection
- // to the server.
- ClientBootstrap bootstrap = new ClientBootstrap(channelManager.getChannelFactory());
- bootstrap.setPipelineFactory(pipelineFactory);
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
-
- // Start the connection attempt to the input server host.
- return bootstrap.connect(serverHost);
- }
-
- @Override
- public void close(boolean wait) {
- synchronized (this) {
- if (closed) {
- return;
- }
- closed = true;
- }
- if (null == channel) {
- return;
- }
- try {
- getHChannelHandlerFromChannel(channel).closeExplicitly();
- } catch (NoResponseHandlerException nrhe) {
- logger.warn("No channel handler found for channel {} when closing it.",
- channel);
- }
- if (wait) {
- channel.close().awaitUninterruptibly();
- } else {
- channel.close();
- }
- channel = null;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("[HChannel: host - ").append(host)
- .append(", channel - ").append(channel)
- .append(", pending reqs - ").append(pendingOps.size())
- .append(", closed - ").append(closed).append("]");
- return sb.toString();
- }
-
- @Override
- public void close() {
- close(false);
- }
-
- /**
- * Helper static method to get the ResponseHandler instance from a Channel
- * via the ChannelPipeline it is associated with. The assumption is that the
- * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
- *
- * @param channel
- * Channel we are retrieving the ResponseHandler instance for
- * @return ResponseHandler Instance tied to the Channel's Pipeline
- */
- public static HChannelHandler getHChannelHandlerFromChannel(Channel channel)
- throws NoResponseHandlerException {
- if (null == channel) {
- throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler");
- }
-
- HChannelHandler handler = (HChannelHandler) channel.getPipeline().getLast();
- if (null == handler) {
- throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline.");
- }
- return handler;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
deleted file mode 100644
index a91bbf8..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.client.handlers.PublishResponseHandler;
-import org.apache.hedwig.client.handlers.UnsubscribeResponseHandler;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-public class NonSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
-
- public NonSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
- AbstractHChannelManager channelManager) {
- super(cfg, channelManager);
- }
-
- @Override
- protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
- Map<OperationType, AbstractResponseHandler> handlers =
- new HashMap<OperationType, AbstractResponseHandler>();
- handlers.put(OperationType.PUBLISH,
- new PublishResponseHandler(cfg, channelManager));
- handlers.put(OperationType.UNSUBSCRIBE,
- new UnsubscribeResponseHandler(cfg, channelManager));
- return handlers;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
deleted file mode 100644
index 17d2401..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ResubscribeException;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-import static org.apache.hedwig.util.VarArgs.va;
-
-/**
- * This class is used when a Subscribe channel gets disconnected and we attempt
- * to resubmit subscribe requests existed in that channel. Once the resubscribe
- * the topic is completed, we need to restart delivery for that topic.
- */
-class ResubscribeCallback implements Callback<ResponseBody> {
-
- private static final Logger logger = LoggerFactory.getLogger(ResubscribeCallback.class);
-
- // Private member variables
- private final TopicSubscriber origTopicSubscriber;
- private final PubSubData origSubData;
- private final AbstractHChannelManager channelManager;
- private final long retryWaitTime;
-
- // Constructor
- ResubscribeCallback(TopicSubscriber origTopicSubscriber,
- PubSubData origSubData,
- AbstractHChannelManager channelManager,
- long retryWaitTime) {
- this.origTopicSubscriber = origTopicSubscriber;
- this.origSubData = origSubData;
- this.channelManager = channelManager;
- this.retryWaitTime = retryWaitTime;
- }
-
- @Override
- public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
- if (logger.isDebugEnabled())
- logger.debug("Resubscribe succeeded for origSubData: " + origSubData);
- // Now we want to restart delivery for the subscription channel only
- // if delivery was started at the time the original subscribe channel
- // was disconnected.
- try {
- channelManager.restartDelivery(origTopicSubscriber);
- } catch (ClientNotSubscribedException e) {
- // This exception should never be thrown here but just in case,
- // log an error and just keep retrying the subscribe request.
- logger.error("Subscribe was successful but error starting delivery for {} : {}",
- va(origTopicSubscriber, e.getMessage()));
- retrySubscribeRequest();
- } catch (AlreadyStartDeliveryException asde) {
- // should not reach here
- }
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (exception instanceof ResubscribeException) {
- // it might be caused by closesub when resubscribing.
- // so we don't need to retry resubscribe again
- logger.warn("Failed to resubscribe {} : but it is caused by closesub when resubscribing. "
- + "so we don't need to retry subscribe again.", origSubData);
- }
- // If the resubscribe fails, just keep retrying the subscribe
- // request. There isn't a way to flag to the application layer that
- // a topic subscription has failed. So instead, we'll just keep
- // retrying in the background until success.
- logger.error("Resubscribe failed with error: " + exception.getMessage());
- // we don't retry subscribe request is channel manager is closing
- // otherwise it might overflow the stack.
- if (!channelManager.isClosed()) {
- retrySubscribeRequest();
- }
- }
-
- private void retrySubscribeRequest() {
- if (channelManager.isClosed()) {
- return;
- }
- origSubData.clearServersList();
- logger.debug("Resubmit subscribe request for {} in {} ms later.",
- va(origTopicSubscriber, retryWaitTime));
- channelManager.submitOpAfterDelay(origSubData, retryWaitTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java
deleted file mode 100644
index c7a71cb..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class WriteCallback implements ChannelFutureListener {
-
- private static final Logger logger = LoggerFactory.getLogger(WriteCallback.class);
-
- // Private member variables
- private PubSubData pubSubData;
- private final HChannelManager channelManager;
-
- // Constructor
- public WriteCallback(PubSubData pubSubData,
- HChannelManager channelManager) {
- super();
- this.pubSubData = pubSubData;
- this.channelManager = channelManager;
- }
-
- public void operationComplete(ChannelFuture future) throws Exception {
- // If the client has stopped, there is no need to proceed
- // with any callback logic here.
- if (channelManager.isClosed()) {
- future.getChannel().close();
- return;
- }
-
- // When the write operation to the server is done, we just need to check
- // if it was successful or not.
- InetSocketAddress host = NetUtils.getHostFromChannel(future.getChannel());
- if (!future.isSuccess()) {
- logger.error("Error writing on channel to host: {}", host);
- // On a write failure for a PubSubRequest, we also want to remove
- // the saved txnId to PubSubData in the ResponseHandler. These
- // requests will not receive an ack response from the server
- // so there is no point storing that information there anymore.
- try {
- HChannelHandler channelHandler =
- HChannelImpl.getHChannelHandlerFromChannel(future.getChannel());
- channelHandler.removeTxn(pubSubData.txnId);
- channelHandler.closeExplicitly();
- } catch (NoResponseHandlerException e) {
- // We just couldn't remove the transaction ID's mapping.
- // The handler was null, so this has been reset anyway.
- logger.warn("Could not find response handler to remove txnId mapping to pubsub data. Ignoring.");
- }
-
- future.getChannel().close();
-
- // If we were not able to write on the channel to the server host,
- // the host could have died or something is wrong with the channel
- // connection where we can connect to the host, but not write to it.
- ByteString hostString = (host == null) ? null : ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host));
- if (pubSubData.writeFailedServers != null && pubSubData.writeFailedServers.contains(hostString)) {
- // We've already tried to write to this server previously and
- // failed, so invoke the operationFailed callback.
- logger.error("Error writing to host more than once so just invoke the operationFailed callback!");
- pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
- "Error while writing message to server: " + hostString));
- } else {
- logger.debug("Try to send the PubSubRequest again to the default server host/VIP for pubSubData: {}",
- pubSubData);
- // Keep track of this current server that we failed to write to
- // but retry the request on the default server host/VIP.
- if (pubSubData.writeFailedServers == null)
- pubSubData.writeFailedServers = new LinkedList<ByteString>();
- pubSubData.writeFailedServers.add(hostString);
- channelManager.submitOpToDefaultServer(pubSubData);
- }
- } else {
- // Now that the write to the server is done, we have to wait for it
- // to respond. The ResponseHandler will take care of the ack
- // response from the server before we can determine if the async
- // PubSub call has really completed successfully or not.
- logger.debug("Successfully wrote to host: {} for pubSubData: {}", host, pubSubData);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
deleted file mode 100644
index caa0734..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.multiplex;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.client.netty.CleanupChannelMap;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
-import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
-import org.apache.hedwig.client.netty.impl.HChannelHandler;
-import org.apache.hedwig.client.netty.impl.HChannelImpl;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-
-import static org.apache.hedwig.util.VarArgs.va;
-
-
-/**
- * Multiplex HChannel Manager which establish a connection for multi subscriptions.
- */
-public class MultiplexHChannelManager extends AbstractHChannelManager {
-
- static final Logger logger = LoggerFactory.getLogger(MultiplexHChannelManager.class);
-
- // Find which HChannel that a given TopicSubscriber used.
- protected final CleanupChannelMap<InetSocketAddress> subscriptionChannels;
-
- // A index map for each topic subscriber is served by which subscription channel
- protected final CleanupChannelMap<TopicSubscriber> sub2Channels;
-
- // Concurrent Map to store Message handler for each topic + sub id combination.
- // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
- // user set when connection is recovered
- protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler
- = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
-
- // PipelineFactory to create subscription netty channels to the appropriate server
- private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;
-
- public MultiplexHChannelManager(ClientConfiguration cfg,
- ChannelFactory socketFactory) {
- super(cfg, socketFactory);
- subscriptionChannels = new CleanupChannelMap<InetSocketAddress>();
- sub2Channels = new CleanupChannelMap<TopicSubscriber>();
- subscriptionChannelPipelineFactory =
- new MultiplexSubscriptionChannelPipelineFactory(cfg, this);
- }
-
- @Override
- protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
- return subscriptionChannelPipelineFactory;
- }
-
- @Override
- protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
- // store the channel connected to target host for future usage
- InetSocketAddress host = NetUtils.getHostFromChannel(channel);
- HChannel newHChannel = new HChannelImpl(host, channel, this,
- getSubscriptionChannelPipelineFactory());
- return storeSubscriptionChannel(host, newHChannel);
- }
-
- @Override
- protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
- HChannel newHChannel = new HChannelImpl(host, this,
- getSubscriptionChannelPipelineFactory());
- return storeSubscriptionChannel(host, newHChannel);
- }
-
- private HChannel storeSubscriptionChannel(InetSocketAddress host,
- HChannel newHChannel) {
- // here, we guarantee there is only one channel used to communicate with target
- // host.
- return subscriptionChannels.addChannel(host, newHChannel);
- }
-
- @Override
- protected HChannel getSubscriptionChannel(InetSocketAddress host) {
- return subscriptionChannels.getChannel(host);
- }
-
- protected HChannel getSubscriptionChannel(TopicSubscriber subscriber) {
- InetSocketAddress host = topic2Host.get(subscriber.getTopic());
- if (null == host) {
- // we don't know where is the owner of the topic
- return null;
- } else {
- return getSubscriptionChannel(host);
- }
- }
-
- @Override
- protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
- InetSocketAddress host = topic2Host.get(subscriber.getTopic());
- if (null == host) {
- // we don't know where is the topic
- return null;
- } else {
- // we had know which server owned the topic
- HChannel channel = getSubscriptionChannel(host);
- if (null == channel) {
- // create a channel to connect to sepcified host
- channel = createAndStoreSubscriptionChannel(host);
- }
- return channel;
- }
- }
-
- @Override
- protected void onSubscriptionChannelDisconnected(InetSocketAddress host,
- Channel channel) {
- HChannel hChannel = subscriptionChannels.getChannel(host);
- if (null == hChannel) {
- return;
- }
- Channel underlyingChannel = hChannel.getChannel();
- if (null == underlyingChannel ||
- !underlyingChannel.equals(channel)) {
- return;
- }
- logger.info("Subscription Channel {} disconnected from {}.",
- va(channel, host));
- // remove existed channel
- if (subscriptionChannels.removeChannel(host, hChannel)) {
- try {
- HChannelHandler channelHandler =
- HChannelImpl.getHChannelHandlerFromChannel(channel);
- channelHandler.getSubscribeResponseHandler()
- .onChannelDisconnected(host, channel);
- } catch (NoResponseHandlerException nrhe) {
- logger.warn("No Channel Handler found for channel {} when it disconnected.",
- channel);
- }
- }
- }
-
- @Override
- public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
- HChannel hChannel = getSubscriptionChannel(topicSubscriber);
- if (null == hChannel) {
- return null;
- }
- Channel channel = hChannel.getChannel();
- if (null == channel) {
- return null;
- }
- try {
- HChannelHandler channelHandler =
- HChannelImpl.getHChannelHandlerFromChannel(channel);
- return channelHandler.getSubscribeResponseHandler();
- } catch (NoResponseHandlerException nrhe) {
- logger.warn("No Channel Handler found for channel {}, topic subscriber {}.",
- channel, topicSubscriber);
- return null;
- }
- }
-
- @Override
- public void startDelivery(TopicSubscriber topicSubscriber,
- MessageHandler messageHandler)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- startDelivery(topicSubscriber, messageHandler, false);
- }
-
- @Override
- protected void restartDelivery(TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- startDelivery(topicSubscriber, null, true);
- }
-
- private void startDelivery(TopicSubscriber topicSubscriber,
- MessageHandler messageHandler,
- boolean restart)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- // Make sure we know about this topic subscription on the client side
- // exists. The assumption is that the client should have in memory the
- // Channel created for the TopicSubscriber once the server has sent
- // an ack response to the initial subscribe request.
- SubscribeResponseHandler subscribeResponseHandler =
- getSubscribeResponseHandler(topicSubscriber);
- if (null == subscribeResponseHandler ||
- !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
- logger.error("Client is not yet subscribed to {}.", topicSubscriber);
- throw new ClientNotSubscribedException("Client is not yet subscribed to "
- + topicSubscriber);
- }
-
- MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
- if (restart) {
- // restart using existing msg handler
- messageHandler = existedMsgHandler;
- } else {
- // some has started delivery but not stop it
- if (null != existedMsgHandler) {
- throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
- }
- if (messageHandler != null) {
- if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
- throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
- }
- }
- }
-
- // tell subscribe response handler to start delivering messages for topicSubscriber
- subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
- }
-
- public void stopDelivery(TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException {
- // Make sure we know that this topic subscription on the client side
- // exists. The assumption is that the client should have in memory the
- // Channel created for the TopicSubscriber once the server has sent
- // an ack response to the initial subscribe request.
- SubscribeResponseHandler subscribeResponseHandler =
- getSubscribeResponseHandler(topicSubscriber);
- if (null == subscribeResponseHandler ||
- !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
- logger.error("Client is not yet subscribed to {}.", topicSubscriber);
- throw new ClientNotSubscribedException("Client is not yet subscribed to "
- + topicSubscriber);
- }
-
- // tell subscribe response handler to stop delivering messages for a given topic subscriber
- topicSubscriber2MessageHandler.remove(topicSubscriber);
- subscribeResponseHandler.stopDelivery(topicSubscriber);
- }
-
-
- @Override
- public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
- final Callback<ResponseBody> callback,
- final Object context) {
- SubscribeResponseHandler subscribeResponseHandler =
- getSubscribeResponseHandler(topicSubscriber);
- if (null == subscribeResponseHandler ||
- !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
- logger.warn("Trying to close a subscription when we don't have a subscription channel cached for {}",
- topicSubscriber);
- callback.operationFinished(context, (ResponseBody)null);
- return;
- }
- subscribeResponseHandler.asyncCloseSubscription(topicSubscriber, callback, context);
- }
-
- @Override
- protected void checkTimeoutRequestsOnSubscriptionChannels() {
- // timeout task may be started before constructing subscriptionChannels
- if (null == subscriptionChannels) {
- return;
- }
- for (HChannel channel : subscriptionChannels.getChannels()) {
- try {
- HChannelHandler channelHandler =
- HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
- channelHandler.checkTimeoutRequests();
- } catch (NoResponseHandlerException nrhe) {
- continue;
- }
- }
- }
-
- @Override
- protected void closeSubscriptionChannels() {
- subscriptionChannels.close();
- }
-
- protected Either<Boolean, HChannel> storeSubscriptionChannel(
- TopicSubscriber topicSubscriber, PubSubData txn, HChannel channel) {
- boolean replaced = sub2Channels.replaceChannel(
- topicSubscriber, txn.getOriginalChannelForResubscribe(), channel);
- if (replaced) {
- return Either.of(replaced, channel);
- } else {
- return Either.of(replaced, null);
- }
- }
-
- protected boolean removeSubscriptionChannel(
- TopicSubscriber topicSubscriber, HChannel channel) {
- return sub2Channels.removeChannel(topicSubscriber, channel);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
deleted file mode 100644
index 9b8ebe0..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.multiplex;
-
-import java.net.InetSocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
-import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-import static org.apache.hedwig.util.VarArgs.va;
-
-public class MultiplexSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
-
- private static final Logger logger =
- LoggerFactory.getLogger(MultiplexSubscribeResponseHandler.class);
-
- // the underlying subscription channel
- volatile HChannel hChannel;
- private final MultiplexHChannelManager sChannelManager;
-
- protected MultiplexSubscribeResponseHandler(ClientConfiguration cfg,
- HChannelManager channelManager) {
- super(cfg, channelManager);
- sChannelManager = (MultiplexHChannelManager) channelManager;
- }
-
- @Override
- public void handleResponse(PubSubResponse response, PubSubData pubSubData,
- Channel channel) throws Exception {
- if (null == hChannel) {
- InetSocketAddress host = NetUtils.getHostFromChannel(channel);
- hChannel = sChannelManager.getSubscriptionChannel(host);
- if (null == hChannel ||
- !channel.equals(hChannel.getChannel())) {
- PubSubException pse =
- new UnexpectedConditionException("Failed to get subscription channel of " + host);
- pubSubData.getCallback().operationFailed(pubSubData.context, pse);
- return;
- }
- }
- super.handleResponse(response, pubSubData, channel);
- }
-
- @Override
- protected Either<StatusCode, HChannel> handleSuccessResponse(
- TopicSubscriber ts, PubSubData pubSubData, Channel channel) {
- // Store the mapping for the TopicSubscriber to the Channel.
- // This is so we can control the starting and stopping of
- // message deliveries from the server on that Channel. Store
- // this only on a successful ack response from the server.
- Either<Boolean, HChannel> result =
- sChannelManager.storeSubscriptionChannel(ts, pubSubData, hChannel);
- if (result.left()) {
- return Either.of(StatusCode.SUCCESS, result.right());
- } else {
- StatusCode code;
- if (pubSubData.isResubscribeRequest()) {
- code = StatusCode.RESUBSCRIBE_EXCEPTION;
- } else {
- code = StatusCode.CLIENT_ALREADY_SUBSCRIBED;
- }
- return Either.of(code, null);
- }
- }
-
- @Override
- public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
- final Callback<ResponseBody> callback,
- final Object context) {
- final ActiveSubscriber ss = getActiveSubscriber(topicSubscriber);
- if (null == ss || null == hChannel) {
- logger.debug("No subscription {} found when closing its subscription from {}.",
- va(topicSubscriber, hChannel));
- callback.operationFinished(context, (ResponseBody)null);
- return;
- }
- Callback<ResponseBody> closeCb = new Callback<ResponseBody>() {
- @Override
- public void operationFinished(Object ctx, ResponseBody respBody) {
- removeSubscription(topicSubscriber, ss);
- sChannelManager.removeSubscriptionChannel(topicSubscriber, hChannel);
- callback.operationFinished(context, null);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- callback.operationFailed(context, exception);
- }
- };
- PubSubData closeOp = new PubSubData(topicSubscriber.getTopic(), null,
- topicSubscriber.getSubscriberId(),
- OperationType.CLOSESUBSCRIPTION,
- null, closeCb, context);
- hChannel.submitOp(closeOp);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
deleted file mode 100644
index c43108a..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.multiplex;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler;
-import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
-import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
-import org.apache.hedwig.client.netty.impl.HChannelHandler;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-public class MultiplexSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
-
- public MultiplexSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
- MultiplexHChannelManager channelManager) {
- super(cfg, channelManager);
- }
-
- @Override
- protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
- Map<OperationType, AbstractResponseHandler> handlers =
- new HashMap<OperationType, AbstractResponseHandler>();
- handlers.put(OperationType.SUBSCRIBE,
- new MultiplexSubscribeResponseHandler(cfg, channelManager));
- handlers.put(OperationType.CLOSESUBSCRIPTION,
- new CloseSubscriptionResponseHandler(cfg, channelManager));
- return handlers;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
deleted file mode 100644
index fb0a7d9..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.simple;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
-import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-import org.apache.hedwig.client.netty.CleanupChannelMap;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
-import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
-import org.apache.hedwig.client.netty.impl.HChannelHandler;
-import org.apache.hedwig.client.netty.impl.HChannelImpl;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-import static org.apache.hedwig.util.VarArgs.va;
-
-/**
- * Simple HChannel Manager which establish a connection for each subscription.
- */
-public class SimpleHChannelManager extends AbstractHChannelManager {
-
- private static final Logger logger = LoggerFactory.getLogger(SimpleHChannelManager.class);
-
- // Concurrent Map to store the cached Channel connections on the client side
- // to a server host for a given Topic + SubscriberId combination. For each
- // TopicSubscriber, we want a unique Channel connection to the server for
- // it. We can also get the ResponseHandler tied to the Channel via the
- // Channel Pipeline.
- protected final CleanupChannelMap<TopicSubscriber> topicSubscriber2Channel;
-
- // Concurrent Map to store Message handler for each topic + sub id combination.
- // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
- // user set when connection is recovered
- protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler
- = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
-
- // PipelineFactory to create subscription netty channels to the appropriate server
- private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;
-
- public SimpleHChannelManager(ClientConfiguration cfg,
- ChannelFactory socketFactory) {
- super(cfg, socketFactory);
- topicSubscriber2Channel = new CleanupChannelMap<TopicSubscriber>();
- this.subscriptionChannelPipelineFactory =
- new SimpleSubscriptionChannelPipelineFactory(cfg, this);
- }
-
- @Override
- public void submitOp(final PubSubData pubSubData) {
- /**
- * In the simple hchannel implementation that if a client closes a subscription
- * and tries to attach to it immediately, it could get a TOPIC_BUSY response. This
- * is because, a subscription is closed simply by closing the channel, and the hub
- * side may not have been notified of the channel disconnection event by the time
- * the new subscription request comes in. To solve this, retry up to 5 times.
- * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-513}
- */
- if (OperationType.SUBSCRIBE.equals(pubSubData.operationType)) {
- final Callback<ResponseBody> origCb = pubSubData.getCallback();
- final AtomicInteger retries = new AtomicInteger(5);
- final Callback<ResponseBody> wrapperCb
- = new Callback<ResponseBody>() {
- @Override
- public void operationFinished(Object ctx,
- ResponseBody resultOfOperation) {
- origCb.operationFinished(ctx, resultOfOperation);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (exception instanceof ServiceDownException
- && exception.getCause() instanceof TopicBusyException
- && retries.decrementAndGet() > 0) {
- logger.warn("TOPIC_DOWN from server using simple channel scheme."
- + "This could be due to the channel disconnection from a close"
- + " not having been triggered on the server side. Retrying");
- SimpleHChannelManager.super.submitOp(pubSubData);
- return;
- }
- origCb.operationFailed(ctx, exception);
- }
- };
- pubSubData.setCallback(wrapperCb);
- }
- super.submitOp(pubSubData);
- }
-
- @Override
- protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
- return subscriptionChannelPipelineFactory;
- }
-
- @Override
- protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
- // for simple channel, we don't store subscription channel now
- // we store it until we received success response
- InetSocketAddress host = NetUtils.getHostFromChannel(channel);
- return new HChannelImpl(host, channel, this,
- getSubscriptionChannelPipelineFactory());
- }
-
- @Override
- protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
- // for simple channel, we don't store subscription channel now
- // we store it until we received success response
- return new HChannelImpl(host, this,
- getSubscriptionChannelPipelineFactory());
- }
-
- protected Either<Boolean, HChannel> storeSubscriptionChannel(
- TopicSubscriber topicSubscriber, PubSubData txn, Channel channel) {
- InetSocketAddress host = NetUtils.getHostFromChannel(channel);
- HChannel newHChannel = new HChannelImpl(host, channel, this,
- getSubscriptionChannelPipelineFactory());
- boolean replaced = topicSubscriber2Channel.replaceChannel(
- topicSubscriber, txn.getOriginalChannelForResubscribe(), newHChannel);
- if (replaced) {
- return Either.of(replaced, newHChannel);
- } else {
- return Either.of(replaced, null);
- }
- }
-
- @Override
- protected HChannel getSubscriptionChannel(InetSocketAddress host) {
- return null;
- }
-
- @Override
- protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
- HChannel channel = topicSubscriber2Channel.getChannel(subscriber);
- if (null != channel) {
- // there is no channel established for this subscription
- return channel;
- } else {
- InetSocketAddress host = topic2Host.get(subscriber.getTopic());
- if (null == host) {
- return null;
- } else {
- channel = getSubscriptionChannel(host);
- if (null == channel) {
- channel = createAndStoreSubscriptionChannel(host);
- }
- return channel;
- }
- }
- }
-
- @Override
- protected void onSubscriptionChannelDisconnected(InetSocketAddress host,
- Channel channel) {
- logger.info("Subscription Channel {} disconnected from {}.",
- va(channel, host));
- try {
- // get hchannel handler
- HChannelHandler channelHandler =
- HChannelImpl.getHChannelHandlerFromChannel(channel);
- channelHandler.getSubscribeResponseHandler()
- .onChannelDisconnected(host, channel);
- } catch (NoResponseHandlerException nrhe) {
- logger.warn("No Channel Handler found for channel {} when it disconnected.",
- channel);
- }
- }
-
- @Override
- public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
- HChannel hChannel = topicSubscriber2Channel.getChannel(topicSubscriber);
- if (null == hChannel) {
- return null;
- }
- Channel channel = hChannel.getChannel();
- if (null == channel) {
- return null;
- }
- try {
- HChannelHandler channelHandler =
- HChannelImpl.getHChannelHandlerFromChannel(channel);
- return channelHandler.getSubscribeResponseHandler();
- } catch (NoResponseHandlerException nrhe) {
- logger.warn("No Channel Handler found for channel {}, topic subscriber {}.",
- channel, topicSubscriber);
- return null;
- }
-
- }
-
- @Override
- public void startDelivery(TopicSubscriber topicSubscriber,
- MessageHandler messageHandler)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- startDelivery(topicSubscriber, messageHandler, false);
- }
-
- @Override
- protected void restartDelivery(TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- startDelivery(topicSubscriber, null, true);
- }
-
- private void startDelivery(TopicSubscriber topicSubscriber,
- MessageHandler messageHandler, boolean restart)
- throws ClientNotSubscribedException, AlreadyStartDeliveryException {
- // Make sure we know about this topic subscription on the client side
- // exists. The assumption is that the client should have in memory the
- // Channel created for the TopicSubscriber once the server has sent
- // an ack response to the initial subscribe request.
- SubscribeResponseHandler subscribeResponseHandler =
- getSubscribeResponseHandler(topicSubscriber);
- if (null == subscribeResponseHandler ||
- !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
- logger.error("Client is not yet subscribed to {}.", topicSubscriber);
- throw new ClientNotSubscribedException("Client is not yet subscribed to "
- + topicSubscriber);
- }
-
- MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
- if (restart) {
- // restart using existing msg handler
- messageHandler = existedMsgHandler;
- } else {
- // some has started delivery but not stop it
- if (null != existedMsgHandler) {
- throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
- }
- if (messageHandler != null) {
- if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
- throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
- }
- }
- }
-
- // tell subscribe response handler to start delivering messages for topicSubscriber
- subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
- }
-
- public void stopDelivery(TopicSubscriber topicSubscriber)
- throws ClientNotSubscribedException {
- // Make sure we know that this topic subscription on the client side
- // exists. The assumption is that the client should have in memory the
- // Channel created for the TopicSubscriber once the server has sent
- // an ack response to the initial subscribe request.
- SubscribeResponseHandler subscribeResponseHandler =
- getSubscribeResponseHandler(topicSubscriber);
- if (null == subscribeResponseHandler ||
- !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
- logger.error("Client is not yet subscribed to {}.", topicSubscriber);
- throw new ClientNotSubscribedException("Client is not yet subscribed to "
- + topicSubscriber);
- }
-
- // tell subscribe response handler to stop delivering messages for a given topic subscriber
- topicSubscriber2MessageHandler.remove(topicSubscriber);
- subscribeResponseHandler.stopDelivery(topicSubscriber);
- }
-
-
- @Override
- public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
- final Callback<ResponseBody> callback,
- final Object context) {
- HChannel hChannel = topicSubscriber2Channel.removeChannel(topicSubscriber);
- if (null == hChannel) {
- logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for {}",
- topicSubscriber);
- callback.operationFinished(context, (ResponseBody)null);
- return;
- }
-
- Channel channel = hChannel.getChannel();
- if (null == channel) {
- callback.operationFinished(context, (ResponseBody)null);
- return;
- }
-
- try {
- HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
- } catch (NoResponseHandlerException nrhe) {
- logger.warn("No Channel Handler found when closing {}'s channel {}.",
- channel, topicSubscriber);
- }
- ChannelFuture future = channel.close();
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Failed to close the subscription channel for {}",
- topicSubscriber);
- callback.operationFailed(context, new ServiceDownException(
- "Failed to close the subscription channel for " + topicSubscriber));
- } else {
- callback.operationFinished(context, (ResponseBody)null);
- }
- }
- });
- }
-
- @Override
- protected void checkTimeoutRequestsOnSubscriptionChannels() {
- // timeout task may be started before constructing topicSubscriber2Channel
- if (null == topicSubscriber2Channel) {
- return;
- }
- for (HChannel channel : topicSubscriber2Channel.getChannels()) {
- try {
- HChannelHandler channelHandler =
- HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
- channelHandler.checkTimeoutRequests();
- } catch (NoResponseHandlerException nrhe) {
- continue;
- }
- }
- }
-
- @Override
- protected void closeSubscriptionChannels() {
- topicSubscriber2Channel.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
deleted file mode 100644
index ee5bd90..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.simple;
-
-
-import java.util.Set;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
-import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
-import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
-import org.apache.hedwig.client.netty.impl.HChannelImpl;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.Either;
-
-public class SimpleSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(SimpleSubscribeResponseHandler.class);
-
- /**
- * Simple Active Subscriber enabling client-side throttling.
- */
- static class SimpleActiveSubscriber extends ActiveSubscriber {
-
- // Set to store all of the outstanding subscribed messages that are pending
- // to be consumed by the client app's MessageHandler. If this ever grows too
- // big (e.g. problem at the client end for message consumption), we can
- // throttle things by temporarily setting the Subscribe Netty Channel
- // to not be readable. When the Set has shrunk sufficiently, we can turn the
- // channel back on to read new messages.
- private final Set<Message> outstandingMsgSet;
-
- public SimpleActiveSubscriber(ClientConfiguration cfg,
- AbstractHChannelManager channelManager,
- TopicSubscriber ts, PubSubData op,
- SubscriptionPreferences preferences,
- Channel channel,
- HChannel hChannel) {
- super(cfg, channelManager, ts, op, preferences, channel, hChannel);
- outstandingMsgSet = Collections.newSetFromMap(
- new ConcurrentHashMap<Message, Boolean>(
- cfg.getMaximumOutstandingMessages(), 1.0f));
- }
-
- @Override
- protected void unsafeDeliverMessage(Message message) {
- // Add this "pending to be consumed" message to the outstandingMsgSet.
- outstandingMsgSet.add(message);
- // Check if we've exceeded the max size for the outstanding message set.
- if (outstandingMsgSet.size() >= cfg.getMaximumOutstandingMessages() &&
- channel.isReadable()) {
- // Too many outstanding messages so throttle it by setting the Netty
- // Channel to not be readable.
- if (logger.isDebugEnabled()) {
- logger.debug("Too many outstanding messages ({}) so throttling the subscribe netty Channel",
- outstandingMsgSet.size());
- }
- channel.setReadable(false);
- }
- super.unsafeDeliverMessage(message);
- }
-
- @Override
- public synchronized void messageConsumed(Message message) {
- super.messageConsumed(message);
- // Remove this consumed message from the outstanding Message Set.
- outstandingMsgSet.remove(message);
- // Check if we throttled message consumption previously when the
- // outstanding message limit was reached. For now, only turn the
- // delivery back on if there are no more outstanding messages to
- // consume. We could make this a configurable parameter if needed.
- if (!channel.isReadable() && outstandingMsgSet.size() == 0) {
- if (logger.isDebugEnabled())
- logger.debug("Message consumption has caught up so okay to turn off"
- + " throttling of messages on the subscribe channel for {}",
- topicSubscriber);
- channel.setReadable(true);
- }
- }
-
- @Override
- public synchronized void startDelivery(MessageHandler messageHandler)
- throws AlreadyStartDeliveryException, ClientNotSubscribedException {
- super.startDelivery(messageHandler);
- // Now make the TopicSubscriber Channel readable (it is set to not be
- // readable when the initial subscription is done). Note that this is an
- // asynchronous call. If this fails (not likely), the futureListener
- // will just log an error message for now.
- ChannelFuture future = channel.setReadable(true);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Unable to make subscriber Channel readable in startDelivery call for {}",
- topicSubscriber);
- }
- }
- });
- }
-
- @Override
- public synchronized void stopDelivery() {
- super.stopDelivery();
- // Now make the TopicSubscriber channel not-readable. This will buffer
- // up messages if any are sent from the server. Note that this is an
- // asynchronous call. If this fails (not likely), the futureListener
- // will just log an error message for now.
- ChannelFuture future = channel.setReadable(false);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.error("Unable to make subscriber Channel not readable in stopDelivery call for {}",
- topicSubscriber);
- }
- }
- });
- }
-
- }
-
- // Track which subscriber is alive in this response handler
- // Which is used for backward compat, since old version hub
- // server doesn't carry (topic, subscriberid) in each message.
- private volatile TopicSubscriber origTopicSubscriber;
- private volatile ActiveSubscriber origActiveSubscriber;
-
- private SimpleHChannelManager sChannelManager;
-
- protected SimpleSubscribeResponseHandler(ClientConfiguration cfg,
- HChannelManager channelManager) {
- super(cfg, channelManager);
- sChannelManager = (SimpleHChannelManager) channelManager;
- }
-
- @Override
- protected ActiveSubscriber createActiveSubscriber(
- ClientConfiguration cfg, AbstractHChannelManager channelManager,
- TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences,
- Channel channel, HChannel hChannel) {
- return new SimpleActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel);
- }
-
- @Override
- protected synchronized ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
- if (null == origTopicSubscriber || !origTopicSubscriber.equals(ts)) {
- return null;
- }
- return origActiveSubscriber;
- }
-
- private synchronized ActiveSubscriber getActiveSubscriber() {
- return origActiveSubscriber;
- }
-
- @Override
- public synchronized boolean hasSubscription(TopicSubscriber ts) {
- if (null == origTopicSubscriber) {
- return false;
- }
- return origTopicSubscriber.equals(ts);
- }
-
- @Override
- protected synchronized boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
- if (null != origTopicSubscriber && !origTopicSubscriber.equals(ts)) {
- return false;
- }
- origTopicSubscriber = null;
- origActiveSubscriber = null;
- return super.removeSubscription(ts, ss);
- }
-
- @Override
- public void handleResponse(PubSubResponse response, PubSubData pubSubData,
- Channel channel) throws Exception {
- // If this was not a successful response to the Subscribe request, we
- // won't be using the Netty Channel created so just close it.
- if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
- HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
- channel.close();
- }
- super.handleResponse(response, pubSubData, channel);
- }
-
- @Override
- public void handleSubscribeMessage(PubSubResponse response) {
- Message message = response.getMessage();
- ActiveSubscriber ss = getActiveSubscriber();
- if (null == ss) {
- logger.error("No Subscriber is alive receiving its message {}.",
- MessageIdUtils.msgIdToReadableString(message.getMsgId()));
- return;
- }
- ss.handleMessage(message);
- }
-
- @Override
- protected Either<StatusCode, HChannel> handleSuccessResponse(
- TopicSubscriber ts, PubSubData pubSubData, Channel channel) {
- // Store the mapping for the TopicSubscriber to the Channel.
- // This is so we can control the starting and stopping of
- // message deliveries from the server on that Channel. Store
- // this only on a successful ack response from the server.
- Either<Boolean, HChannel> result =
- sChannelManager.storeSubscriptionChannel(ts, pubSubData, channel);
- if (result.left()) {
- return Either.of(StatusCode.SUCCESS, result.right());
- } else {
- StatusCode code;
- if (pubSubData.isResubscribeRequest()) {
- code = StatusCode.RESUBSCRIBE_EXCEPTION;
- } else {
- code = StatusCode.CLIENT_ALREADY_SUBSCRIBED;
- }
- return Either.of(code, null);
- }
- }
-
- @Override
- protected synchronized void postHandleSuccessResponse(
- TopicSubscriber ts, ActiveSubscriber as) {
- origTopicSubscriber = ts;
- origActiveSubscriber = as;
- }
-
- @Override
- public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
- final Callback<ResponseBody> callback,
- final Object context) {
- // nothing to do just clear status
- // channel manager takes the responsibility to close the channel
- callback.operationFinished(context, (ResponseBody)null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
deleted file mode 100644
index d14f053..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty.impl.simple;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.handlers.AbstractResponseHandler;
-import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler;
-import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-public class SimpleSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory {
-
- public SimpleSubscriptionChannelPipelineFactory(ClientConfiguration cfg,
- SimpleHChannelManager channelManager) {
- super(cfg, channelManager);
- }
-
- @Override
- protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() {
- Map<OperationType, AbstractResponseHandler> handlers =
- new HashMap<OperationType, AbstractResponseHandler>();
- handlers.put(OperationType.SUBSCRIBE,
- new SimpleSubscribeResponseHandler(cfg, channelManager));
- handlers.put(OperationType.CLOSESUBSCRIPTION,
- new CloseSubscriptionResponseHandler(cfg, channelManager));
- return handlers;
- }
-
-}