You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/22 12:10:38 UTC
svn commit: r1400827 - in /zookeeper/bookkeeper/trunk: ./
hedwig-server/src/main/java/org/apache/hedwig/server/handlers/
hedwig-server/src/main/java/org/apache/hedwig/server/netty/
hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ hedwig-serv...
Author: ivank
Date: Mon Oct 22 10:10:38 2012
New Revision: 1400827
URL: http://svn.apache.org/viewvc?rev=1400827&view=rev
Log:
BOOKKEEPER-435: Create SubscriptionChannelManager to manage all subscription channel (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Oct 22 10:10:38 2012
@@ -186,6 +186,8 @@ Trunk (unreleased changes)
BOOKKEEPER-422: Simplify AbstractSubscriptionManager (stu via fpj)
+ BOOKKEEPER-435: Create SubscriptionChannelManager to manage all subscription channel (sijie via ivank)
+
hedwig-client:
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java Mon Oct 22 10:10:38 2012
@@ -18,19 +18,15 @@
package org.apache.hedwig.server.handlers;
-import java.util.Map;
-
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
public class NettyHandlerBean implements NettyHandlerMXBean, HedwigMBeanInfo {
- Map<OperationType, Handler> handlers;
- SubscribeHandler subHandler;
+ SubscriptionChannelManager subChannelMgr;
- public NettyHandlerBean(Map<OperationType, Handler> handlers) {
- this.handlers = handlers;
- subHandler = (SubscribeHandler) this.handlers.get(OperationType.SUBSCRIBE);
+ public NettyHandlerBean(SubscriptionChannelManager subChannelMgr) {
+ this.subChannelMgr = subChannelMgr;
}
@Override
@@ -45,7 +41,7 @@ public class NettyHandlerBean implements
@Override
public int getNumSubscriptionChannels() {
- return subHandler.sub2Channel.size();
+ return subChannelMgr.getNumSubscriptionChannels();
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Mon Oct 22 10:10:38 2012
@@ -17,8 +17,6 @@
*/
package org.apache.hedwig.server.handlers;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
@@ -54,53 +52,32 @@ import org.apache.hedwig.server.subscrip
import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
-public class SubscribeHandler extends BaseHandler implements ChannelDisconnectListener {
+public class SubscribeHandler extends BaseHandler {
static Logger logger = LoggerFactory.getLogger(SubscribeHandler.class);
- private DeliveryManager deliveryMgr;
- private PersistenceManager persistenceMgr;
- private SubscriptionManager subMgr;
- ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
- ConcurrentHashMap<Channel, TopicSubscriber> channel2sub;
+ private final DeliveryManager deliveryMgr;
+ private final PersistenceManager persistenceMgr;
+ private final SubscriptionManager subMgr;
+ private final SubscriptionChannelManager subChannelMgr;
+
// op stats
private final OpStats subStats;
- private static ChannelFutureListener CLOSE_OLD_CHANNEL_LISTENER = new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.warn("Failed to close old subscription channel.");
- } else {
- logger.debug("Close old subscription channel succeed.");
- }
- }
- };
-
- public SubscribeHandler(TopicManager topicMgr, DeliveryManager deliveryManager, PersistenceManager persistenceMgr,
- SubscriptionManager subMgr, ServerConfiguration cfg) {
+ public SubscribeHandler(ServerConfiguration cfg, TopicManager topicMgr,
+ DeliveryManager deliveryManager,
+ PersistenceManager persistenceMgr,
+ SubscriptionManager subMgr,
+ SubscriptionChannelManager subChannelMgr) {
super(topicMgr, cfg);
this.deliveryMgr = deliveryManager;
this.persistenceMgr = persistenceMgr;
this.subMgr = subMgr;
- sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
- channel2sub = new ConcurrentHashMap<Channel, TopicSubscriber>();
+ this.subChannelMgr = subChannelMgr;
subStats = ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE);
}
- public void channelDisconnected(Channel channel) {
- // Evils of synchronized programming: there is a race between a channel
- // getting disconnected, and us adding it to the maps when a subscribe
- // succeeds
- synchronized (channel) {
- TopicSubscriber topicSub = channel2sub.remove(channel);
- if (topicSub != null) {
- // remove entry only currently mapped to given value.
- sub2Channel.remove(topicSub, channel);
- }
- }
- }
-
@Override
public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
@@ -192,56 +169,20 @@ public class SubscribeHandler extends Ba
.addListener(ChannelFutureListener.CLOSE);
return;
}
- // race with channel getting disconnected while we are adding it
- // to the 2 maps
- synchronized (channel) {
- boolean forceAttach = false;
- if (subRequest.hasForceAttach()) {
- forceAttach = subRequest.getForceAttach();
- }
-
- Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
- if (null != oldChannel) {
- boolean subSuccess = false;
- if (forceAttach) {
- // it is safe to close old channel here since new channel will be put
- // in sub2Channel / channel2Sub so there is no race between channel
- // getting disconnected and it.
- ChannelFuture future = oldChannel.close();
- future.addListener(CLOSE_OLD_CHANNEL_LISTENER);
- logger.info("New subscribe request (" + request.getTxnId() + ") for (topic: " + topic.toStringUtf8()
- + ", subscriber: " + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
- + " kills old channel " + oldChannel.getRemoteAddress());
- // try replace the oldChannel
- // if replace failure, it migth caused because channelDisconnect callback
- // has removed the old channel.
- if (!sub2Channel.replace(topicSub, oldChannel, channel)) {
- // try to add it now.
- // if add failure, it means other one has obtained the channel
- oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
- if (null == oldChannel) {
- subSuccess = true;
- }
- } else {
- subSuccess = true;
- }
- }
- if (!subSuccess) {
- PubSubException pse = new PubSubException.TopicBusyException(
- "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8()
- + " is already being served on a different channel " + oldChannel.getRemoteAddress());
- subStats.incrementFailedOps();
- logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: " + topic.toStringUtf8()
- + ", subscriber: " + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
- + " since it already being served on a different channel " + oldChannel.getRemoteAddress());
- channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
- .addListener(ChannelFutureListener.CLOSE);
- return;
- }
- }
- // channel2sub is just a cache, so we can add to it
- // without synchronization
- channel2sub.put(channel, topicSub);
+ boolean forceAttach = false;
+ if (subRequest.hasForceAttach()) {
+ forceAttach = subRequest.getForceAttach();
+ }
+ // Try to store the subscription channel for the topic subscriber
+ Channel oldChannel = subChannelMgr.put(topicSub, channel, forceAttach);
+ if (null != oldChannel) {
+ PubSubException pse = new PubSubException.TopicBusyException(
+ "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8()
+ + " is already being served on a different channel " + oldChannel + ".");
+ subStats.incrementFailedOps();
+ channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
+ .addListener(ChannelFutureListener.CLOSE);
+ return;
}
// First write success and then tell the delivery manager,
// otherwise the first message might go out before the response
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java?rev=1400827&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java Mon Oct 22 10:10:38 2012
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public class SubscriptionChannelManager implements ChannelDisconnectListener {
+
+ static Logger logger = LoggerFactory.getLogger(SubscriptionChannelManager.class);
+
+ private static ChannelFutureListener CLOSE_OLD_CHANNEL_LISTENER =
+ new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.warn("Failed to close old subscription channel.");
+ } else {
+ logger.debug("Close old subscription channel succeed.");
+ }
+ }
+ };
+
+ final ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
+ final ConcurrentHashMap<Channel, TopicSubscriber> channel2sub;
+
+ public SubscriptionChannelManager() {
+ sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+ channel2sub = new ConcurrentHashMap<Channel, TopicSubscriber>();
+ }
+
+ @Override
+ public void channelDisconnected(Channel channel) {
+ // Evils of synchronized programming: there is a race between a channel
+ // getting disconnected, and us adding it to the maps when a subscribe
+ // succeeds
+ synchronized (channel) {
+ TopicSubscriber topicSub = channel2sub.remove(channel);
+ if (topicSub != null) {
+ logger.info("Subscription channel {} for {} is disconnected.",
+ va(channel.getRemoteAddress(), topicSub));
+ // remove entry only currently mapped to given value.
+ sub2Channel.remove(topicSub, channel);
+ }
+ }
+ }
+
+ public int getNumSubscriptionChannels() {
+ return channel2sub.size();
+ }
+
+ public int getNumSubscriptions() {
+ return sub2Channel.size();
+ }
+
+ /**
+ * Put <code>topicSub</code> on Channel <code>channel</code>.
+ *
+ * @param topicSub
+ * Topic Subscription
+ * @param channel
+ * Netty channel
+ * @param mode
+ * Create or Attach mode
+ * @return null succeed, otherwise the old existed channel.
+ */
+ public Channel put(TopicSubscriber topicSub, Channel channel, boolean forceAttach) {
+ // race with channel getting disconnected while we are adding it
+ // to the 2 maps
+ synchronized (channel) {
+ Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
+ if (null != oldChannel) {
+ boolean subSuccess = false;
+ if (forceAttach) {
+ // it is safe to close old channel here since new channel will be put
+ // in sub2Channel / channel2Sub so there is no race between channel
+ // getting disconnected and it.
+ ChannelFuture future = oldChannel.close();
+ future.addListener(CLOSE_OLD_CHANNEL_LISTENER);
+ logger.info("Subscribe request for ({}) from channel ({}) kills old channel ({}).",
+ va(topicSub, channel, oldChannel));
+ // try replace the oldChannel
+ // if replace failure, it migth caused because channelDisconnect callback
+ // has removed the old channel.
+ if (!sub2Channel.replace(topicSub, oldChannel, channel)) {
+ // try to add it now.
+ // if add failure, it means other one has obtained the channel
+ oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
+ if (null == oldChannel) {
+ subSuccess = true;
+ }
+ } else {
+ subSuccess = true;
+ }
+ }
+ if (!subSuccess) {
+ logger.error("Error serving subscribe request for ({}) from ({}) since it already served on ({}).",
+ va(topicSub, channel, oldChannel));
+ return oldChannel;
+ }
+ }
+ // channel2sub is just a cache, so we can add to it
+ // without synchronization
+ channel2sub.put(channel, topicSub);
+ return null;
+ }
+ }
+
+ /**
+ * Remove <code>topicSub</code> from Channel <code>channel</code>
+ *
+ * @param topicSub
+ * Topic Subscription
+ * @param channel
+ * Netty channel
+ */
+ public void remove(TopicSubscriber topicSub, Channel channel) {
+ synchronized (channel) {
+ if (!channel2sub.remove(channel, topicSub)) {
+ logger.warn("Failed to remove subscription ({}) due to it isn't on channel ({}).",
+ va(topicSub, channel));
+ }
+ if (!sub2Channel.remove(topicSub, channel)) {
+ logger.warn("Failed to remove channel ({}) due to it isn't ({})'s channel.",
+ va(channel, topicSub));
+ }
+ }
+ }
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Mon Oct 22 10:10:38 2012
@@ -61,6 +61,7 @@ import org.apache.hedwig.server.handlers
import org.apache.hedwig.server.handlers.NettyHandlerBean;
import org.apache.hedwig.server.handlers.PublishHandler;
import org.apache.hedwig.server.handlers.SubscribeHandler;
+import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
import org.apache.hedwig.server.handlers.UnsubscribeHandler;
import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
@@ -218,24 +219,31 @@ public class PubSubServer {
return tm;
}
- protected Map<OperationType, Handler> initializeNettyHandlers(TopicManager tm, DeliveryManager dm,
- PersistenceManager pm, SubscriptionManager sm) {
+ protected Map<OperationType, Handler> initializeNettyHandlers(
+ TopicManager tm, DeliveryManager dm,
+ PersistenceManager pm, SubscriptionManager sm,
+ SubscriptionChannelManager subChannelMgr) {
Map<OperationType, Handler> handlers = new HashMap<OperationType, Handler>();
handlers.put(OperationType.PUBLISH, new PublishHandler(tm, pm, conf));
- handlers.put(OperationType.SUBSCRIBE, new SubscribeHandler(tm, dm, pm, sm, conf));
+ handlers.put(OperationType.SUBSCRIBE,
+ new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr));
handlers.put(OperationType.UNSUBSCRIBE, new UnsubscribeHandler(tm, conf, sm, dm));
handlers.put(OperationType.CONSUME, new ConsumeHandler(tm, sm, conf));
handlers = Collections.unmodifiableMap(handlers);
return handlers;
}
- protected void initializeNetty(SslServerContextFactory sslFactory, Map<OperationType, Handler> handlers) {
+ protected void initializeNetty(SslServerContextFactory sslFactory,
+ Map<OperationType, Handler> handlers,
+ SubscriptionChannelManager subChannelMgr) {
boolean isSSLEnabled = (sslFactory != null) ? true : false;
InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
- UmbrellaHandler umbrellaHandler = new UmbrellaHandler(allChannels, handlers, isSSLEnabled);
- PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, sslFactory, conf
- .getMaximumMessageSize());
+ UmbrellaHandler umbrellaHandler =
+ new UmbrellaHandler(allChannels, handlers, subChannelMgr, isSSLEnabled);
+ PubSubServerPipelineFactory pipeline =
+ new PubSubServerPipelineFactory(umbrellaHandler, sslFactory,
+ conf.getMaximumMessageSize());
bootstrap.setPipelineFactory(pipeline);
bootstrap.setOption("child.tcpNoDelay", true);
@@ -297,14 +305,14 @@ public class PubSubServer {
unregisterJMX();
}
- protected void registerJMX(Map<OperationType, Handler> handlers) {
+ protected void registerJMX(SubscriptionChannelManager subChannelMgr) {
try {
String jmxName = JMXNAME_PREFIX + conf.getServerPort() + "_"
+ conf.getSSLServerPort();
jmxServerBean = new PubSubServerBean(jmxName);
HedwigMBeanRegistry.getInstance().register(jmxServerBean, null);
try {
- jmxNettyBean = new NettyHandlerBean(handlers);
+ jmxNettyBean = new NettyHandlerBean(subChannelMgr);
HedwigMBeanRegistry.getInstance().register(jmxNettyBean, jmxServerBean);
} catch (Exception e) {
logger.warn("Failed to register with JMX", e);
@@ -408,14 +416,17 @@ public class PubSubServer {
// Initialize the Netty Handlers (used by the
// UmbrellaHandler) once so they can be shared by
// both the SSL and non-SSL channels.
- Map<OperationType, Handler> handlers = initializeNettyHandlers(tm, dm, pm, sm);
+ SubscriptionChannelManager subChannelMgr = new SubscriptionChannelManager();
+ Map<OperationType, Handler> handlers =
+ initializeNettyHandlers(tm, dm, pm, sm, subChannelMgr);
// Initialize Netty for the regular non-SSL channels
- initializeNetty(null, handlers);
+ initializeNetty(null, handlers, subChannelMgr);
if (conf.isSSLEnabled()) {
- initializeNetty(new SslServerContextFactory(conf), handlers);
+ initializeNetty(new SslServerContextFactory(conf),
+ handlers, subChannelMgr);
}
// register jmx
- registerJMX(handlers);
+ registerJMX(subChannelMgr);
} catch (Exception e) {
ConcurrencyUtils.put(queue, Either.right(e));
return;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java Mon Oct 22 10:10:38 2012
@@ -48,17 +48,18 @@ import org.apache.hedwig.server.handlers
public class UmbrellaHandler extends SimpleChannelHandler {
static Logger logger = LoggerFactory.getLogger(UmbrellaHandler.class);
- private Map<OperationType, Handler> handlers;
- private ChannelGroup allChannels;
- private ChannelDisconnectListener subscribeHandler;
- private boolean isSSLEnabled = false;
+ private final Map<OperationType, Handler> handlers;
+ private final ChannelGroup allChannels;
+ private final ChannelDisconnectListener channelDisconnectListener;
+ private final boolean isSSLEnabled;
public UmbrellaHandler(ChannelGroup allChannels, Map<OperationType, Handler> handlers,
+ ChannelDisconnectListener channelDisconnectListener,
boolean isSSLEnabled) {
this.allChannels = allChannels;
this.isSSLEnabled = isSSLEnabled;
this.handlers = handlers;
- subscribeHandler = (ChannelDisconnectListener) handlers.get(OperationType.SUBSCRIBE);
+ this.channelDisconnectListener = channelDisconnectListener;
}
@Override
@@ -116,7 +117,7 @@ public class UmbrellaHandler extends Sim
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = ctx.getChannel();
// subscribe handler needs to know about channel disconnects
- subscribeHandler.channelDisconnected(channel);
+ channelDisconnectListener.channelDisconnected(channel);
channel.close();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Mon Oct 22 10:10:38 2012
@@ -39,6 +39,7 @@ import org.jboss.netty.logging.Log4JLogg
import org.apache.hedwig.client.HedwigClient;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
+import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
import org.apache.hedwig.server.handlers.Handler;
import org.apache.hedwig.server.netty.PubSubServer;
import org.apache.hedwig.server.netty.PubSubServerPipelineFactory;
@@ -112,7 +113,10 @@ public class HedwigProxy {
InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
allChannels = new DefaultChannelGroup("hedwigproxy");
ServerBootstrap bootstrap = new ServerBootstrap(serverSocketChannelFactory);
- UmbrellaHandler umbrellaHandler = new UmbrellaHandler(allChannels, handlers, false);
+ ChannelDisconnectListener disconnectListener =
+ (ChannelDisconnectListener) handlers.get(OperationType.SUBSCRIBE);
+ UmbrellaHandler umbrellaHandler =
+ new UmbrellaHandler(allChannels, handlers, disconnectListener, false);
PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, null, cfg
.getMaximumMessageSize());
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Mon Oct 22 10:10:38 2012
@@ -56,6 +56,7 @@ public class TestSubUnsubHandler extends
SubscribeHandler sh;
StubDeliveryManager dm;
StubSubscriptionManager sm;
+ SubscriptionChannelManager subChannelMgr;
ByteString topic = ByteString.copyFromUtf8("topic");
WriteRecordingChannel channel;
@@ -75,7 +76,8 @@ public class TestSubUnsubHandler extends
dm = new StubDeliveryManager();
PersistenceManager pm = LocalDBPersistenceManager.instance();
sm = new StubSubscriptionManager(tm, pm, dm, conf, executor);
- sh = new SubscribeHandler(tm, dm, pm, sm, conf);
+ subChannelMgr = new SubscriptionChannelManager();
+ sh = new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr);
channel = new WriteRecordingChannel();
subscriberId = ByteString.copyFromUtf8("subId");
@@ -105,8 +107,10 @@ public class TestSubUnsubHandler extends
assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
// make sure the channel was put in the maps
- assertEquals(new TopicSubscriber(topic, subscriberId), sh.channel2sub.get(channel));
- assertEquals(channel, sh.sub2Channel.get(new TopicSubscriber(topic, subscriberId)));
+ assertEquals(new TopicSubscriber(topic, subscriberId),
+ subChannelMgr.channel2sub.get(channel));
+ assertEquals(channel,
+ subChannelMgr.sub2Channel.get(new TopicSubscriber(topic, subscriberId)));
// make sure delivery was started
StartServingRequest startRequest = (StartServingRequest) dm.lastRequest.poll();
@@ -134,7 +138,7 @@ public class TestSubUnsubHandler extends
assertEquals(StatusCode.TOPIC_BUSY, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
// after disconnecting the channel, subscribe should work again
- sh.channelDisconnected(channel);
+ subChannelMgr.channelDisconnected(channel);
dupChannel = new WriteRecordingChannel();
sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);