You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/22 12:10:38 UTC

svn commit: r1400827 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ hedwig-server/src/main/java/org/apache/hedwig/server/netty/ hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ hedwig-serv...

Author: ivank
Date: Mon Oct 22 10:10:38 2012
New Revision: 1400827

URL: http://svn.apache.org/viewvc?rev=1400827&view=rev
Log:
BOOKKEEPER-435: Create SubscriptionChannelManager to manage all subscription channel (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Oct 22 10:10:38 2012
@@ -186,6 +186,8 @@ Trunk (unreleased changes)
 
 	BOOKKEEPER-422: Simplify AbstractSubscriptionManager (stu via fpj)
 
+        BOOKKEEPER-435: Create SubscriptionChannelManager to manage all subscription channel (sijie via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java Mon Oct 22 10:10:38 2012
@@ -18,19 +18,15 @@
 
 package org.apache.hedwig.server.handlers;
 
-import java.util.Map;
-
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
 import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
 
 public class NettyHandlerBean implements NettyHandlerMXBean, HedwigMBeanInfo {
 
-    Map<OperationType, Handler> handlers;
-    SubscribeHandler subHandler;
+    SubscriptionChannelManager subChannelMgr;
 
-    public NettyHandlerBean(Map<OperationType, Handler> handlers) {
-        this.handlers = handlers;
-        subHandler = (SubscribeHandler) this.handlers.get(OperationType.SUBSCRIBE);
+   public NettyHandlerBean(SubscriptionChannelManager subChannelMgr) {
+       this.subChannelMgr = subChannelMgr;
     }
 
     @Override
@@ -45,7 +41,7 @@ public class NettyHandlerBean implements
 
     @Override
     public int getNumSubscriptionChannels() {
-        return subHandler.sub2Channel.size();
+        return subChannelMgr.getNumSubscriptionChannels();
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Mon Oct 22 10:10:38 2012
@@ -17,8 +17,6 @@
  */
 package org.apache.hedwig.server.handlers;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.channel.Channel;
@@ -54,53 +52,32 @@ import org.apache.hedwig.server.subscrip
 import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
 
-public class SubscribeHandler extends BaseHandler implements ChannelDisconnectListener {
+public class SubscribeHandler extends BaseHandler {
     static Logger logger = LoggerFactory.getLogger(SubscribeHandler.class);
 
-    private DeliveryManager deliveryMgr;
-    private PersistenceManager persistenceMgr;
-    private SubscriptionManager subMgr;
-    ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
-    ConcurrentHashMap<Channel, TopicSubscriber> channel2sub;
+    private final DeliveryManager deliveryMgr;
+    private final PersistenceManager persistenceMgr;
+    private final SubscriptionManager subMgr;
+    private final SubscriptionChannelManager subChannelMgr;
+
     // op stats
     private final OpStats subStats;
 
-    private static ChannelFutureListener CLOSE_OLD_CHANNEL_LISTENER = new ChannelFutureListener() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-            if (!future.isSuccess()) {
-                logger.warn("Failed to close old subscription channel.");
-            } else {
-                logger.debug("Close old subscription channel succeed.");
-            }
-        }
-    };
-
-    public SubscribeHandler(TopicManager topicMgr, DeliveryManager deliveryManager, PersistenceManager persistenceMgr,
-                            SubscriptionManager subMgr, ServerConfiguration cfg) {
+    public SubscribeHandler(ServerConfiguration cfg, TopicManager topicMgr,
+                            DeliveryManager deliveryManager,
+                            PersistenceManager persistenceMgr,
+                            SubscriptionManager subMgr,
+                            SubscriptionChannelManager subChannelMgr) {
         super(topicMgr, cfg);
         this.deliveryMgr = deliveryManager;
         this.persistenceMgr = persistenceMgr;
         this.subMgr = subMgr;
-        sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
-        channel2sub = new ConcurrentHashMap<Channel, TopicSubscriber>();
+        this.subChannelMgr = subChannelMgr;
         subStats = ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE);
     }
 
-    public void channelDisconnected(Channel channel) {
-        // Evils of synchronized programming: there is a race between a channel
-        // getting disconnected, and us adding it to the maps when a subscribe
-        // succeeds
-        synchronized (channel) {
-            TopicSubscriber topicSub = channel2sub.remove(channel);
-            if (topicSub != null) {
-                // remove entry only currently mapped to given value.
-                sub2Channel.remove(topicSub, channel);
-            }
-        }
-    }
-
     @Override
     public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
 
@@ -192,56 +169,20 @@ public class SubscribeHandler extends Ba
                     .addListener(ChannelFutureListener.CLOSE);
                     return;
                 }
-                // race with channel getting disconnected while we are adding it
-                // to the 2 maps
-                synchronized (channel) {
-                    boolean forceAttach = false;
-                    if (subRequest.hasForceAttach()) {
-                        forceAttach = subRequest.getForceAttach();
-                    }
-
-                    Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
-                    if (null != oldChannel) {
-                        boolean subSuccess = false;
-                        if (forceAttach) {
-                            // it is safe to close old channel here since new channel will be put
-                            // in sub2Channel / channel2Sub so there is no race between channel
-                            // getting disconnected and it.
-                            ChannelFuture future = oldChannel.close();
-                            future.addListener(CLOSE_OLD_CHANNEL_LISTENER);
-                            logger.info("New subscribe request (" + request.getTxnId() + ") for (topic: " + topic.toStringUtf8()
-                                      + ", subscriber: " + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
-                                      + " kills old channel " + oldChannel.getRemoteAddress());
-                            // try replace the oldChannel
-                            // if replace failure, it migth caused because channelDisconnect callback
-                            // has removed the old channel.
-                            if (!sub2Channel.replace(topicSub, oldChannel, channel)) {
-                                // try to add it now.
-                                // if add failure, it means other one has obtained the channel
-                                oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
-                                if (null == oldChannel) {
-                                    subSuccess = true;
-                                }
-                            } else {
-                                subSuccess = true;
-                            }
-                        }
-                        if (!subSuccess) {
-                            PubSubException pse = new PubSubException.TopicBusyException(
-                                "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8()
-                                + " is already being served on a different channel " + oldChannel.getRemoteAddress());
-                            subStats.incrementFailedOps();
-                            logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: " + topic.toStringUtf8()
-                                       + ", subscriber: " + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress()
-                                       + " since it already being served on a different channel " + oldChannel.getRemoteAddress());
-                            channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
-                            .addListener(ChannelFutureListener.CLOSE);
-                            return;
-                        }
-                    }
-                    // channel2sub is just a cache, so we can add to it
-                    // without synchronization
-                    channel2sub.put(channel, topicSub);
+                boolean forceAttach = false;
+                if (subRequest.hasForceAttach()) {
+                    forceAttach = subRequest.getForceAttach();
+                }
+                // Try to store the subscription channel for the topic subscriber
+                Channel oldChannel = subChannelMgr.put(topicSub, channel, forceAttach);
+                if (null != oldChannel) {
+                    PubSubException pse = new PubSubException.TopicBusyException(
+                        "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8()
+                        + " is already being served on a different channel " + oldChannel + ".");
+                    subStats.incrementFailedOps();
+                    channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
+                    .addListener(ChannelFutureListener.CLOSE);
+                    return;
                 }
                 // First write success and then tell the delivery manager,
                 // otherwise the first message might go out before the response

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java?rev=1400827&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java Mon Oct 22 10:10:38 2012
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public class SubscriptionChannelManager implements ChannelDisconnectListener {
+
+    static Logger logger = LoggerFactory.getLogger(SubscriptionChannelManager.class);
+
+    private static ChannelFutureListener CLOSE_OLD_CHANNEL_LISTENER =
+    new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+            if (!future.isSuccess()) {
+                logger.warn("Failed to close old subscription channel.");
+            } else {
+                logger.debug("Close old subscription channel succeed.");
+            }
+        }
+    };
+
+    final ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
+    final ConcurrentHashMap<Channel, TopicSubscriber> channel2sub;
+
+    public SubscriptionChannelManager() {
+        sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+        channel2sub = new ConcurrentHashMap<Channel, TopicSubscriber>();
+    }
+
+    @Override
+    public void channelDisconnected(Channel channel) {
+        // Evils of synchronized programming: there is a race between a channel
+        // getting disconnected, and us adding it to the maps when a subscribe
+        // succeeds
+        synchronized (channel) {
+            TopicSubscriber topicSub = channel2sub.remove(channel);
+            if (topicSub != null) {
+                logger.info("Subscription channel {} for {} is disconnected.",
+                            va(channel.getRemoteAddress(), topicSub));
+                // remove entry only currently mapped to given value.
+                sub2Channel.remove(topicSub, channel);
+            }
+        }
+    }
+
+    public int getNumSubscriptionChannels() {
+        return channel2sub.size();
+    }
+
+    public int getNumSubscriptions() {
+        return sub2Channel.size();
+    }
+
+    /**
+     * Put <code>topicSub</code> on Channel <code>channel</code>.
+     *
+     * @param topicSub
+     *          Topic Subscription
+     * @param channel
+     *          Netty channel
+     * @param mode
+     *          Create or Attach mode
+     * @return null succeed, otherwise the old existed channel.
+     */
+    public Channel put(TopicSubscriber topicSub, Channel channel, boolean forceAttach) {
+        // race with channel getting disconnected while we are adding it
+        // to the 2 maps
+        synchronized (channel) {
+            Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
+            if (null != oldChannel) {
+                boolean subSuccess = false;
+                if (forceAttach) {
+                    // it is safe to close old channel here since new channel will be put
+                    // in sub2Channel / channel2Sub so there is no race between channel
+                    // getting disconnected and it.
+                    ChannelFuture future = oldChannel.close();
+                    future.addListener(CLOSE_OLD_CHANNEL_LISTENER);
+                    logger.info("Subscribe request for ({}) from channel ({}) kills old channel ({}).",
+                                va(topicSub, channel, oldChannel));
+                    // try replace the oldChannel
+                    // if replace failure, it migth caused because channelDisconnect callback
+                    // has removed the old channel.
+                    if (!sub2Channel.replace(topicSub, oldChannel, channel)) {
+                        // try to add it now.
+                        // if add failure, it means other one has obtained the channel
+                        oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
+                        if (null == oldChannel) {
+                            subSuccess = true;
+                        }
+                    } else {
+                        subSuccess = true;
+                    }
+                }
+                if (!subSuccess) {
+                    logger.error("Error serving subscribe request for ({}) from ({}) since it already served on ({}).",
+                                 va(topicSub, channel, oldChannel));
+                    return oldChannel;
+                }
+            }
+            // channel2sub is just a cache, so we can add to it
+            // without synchronization
+            channel2sub.put(channel, topicSub);
+            return null;
+        }
+    }
+
+    /**
+     * Remove <code>topicSub</code> from Channel <code>channel</code>
+     *
+     * @param topicSub
+     *          Topic Subscription
+     * @param channel
+     *          Netty channel
+     */
+    public void remove(TopicSubscriber topicSub, Channel channel) {
+        synchronized (channel) {
+            if (!channel2sub.remove(channel, topicSub)) {
+                logger.warn("Failed to remove subscription ({}) due to it isn't on channel ({}).",
+                            va(topicSub, channel));
+            }
+            if (!sub2Channel.remove(topicSub, channel)) {
+                logger.warn("Failed to remove channel ({}) due to it isn't ({})'s channel.",
+                            va(channel, topicSub));
+            }
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Mon Oct 22 10:10:38 2012
@@ -61,6 +61,7 @@ import org.apache.hedwig.server.handlers
 import org.apache.hedwig.server.handlers.NettyHandlerBean;
 import org.apache.hedwig.server.handlers.PublishHandler;
 import org.apache.hedwig.server.handlers.SubscribeHandler;
+import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
 import org.apache.hedwig.server.handlers.UnsubscribeHandler;
 import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
 import org.apache.hedwig.server.meta.MetadataManagerFactory;
@@ -218,24 +219,31 @@ public class PubSubServer {
         return tm;
     }
 
-    protected Map<OperationType, Handler> initializeNettyHandlers(TopicManager tm, DeliveryManager dm,
-            PersistenceManager pm, SubscriptionManager sm) {
+   protected Map<OperationType, Handler> initializeNettyHandlers(
+           TopicManager tm, DeliveryManager dm,
+           PersistenceManager pm, SubscriptionManager sm,
+           SubscriptionChannelManager subChannelMgr) {
         Map<OperationType, Handler> handlers = new HashMap<OperationType, Handler>();
         handlers.put(OperationType.PUBLISH, new PublishHandler(tm, pm, conf));
-        handlers.put(OperationType.SUBSCRIBE, new SubscribeHandler(tm, dm, pm, sm, conf));
+        handlers.put(OperationType.SUBSCRIBE,
+                     new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr));
         handlers.put(OperationType.UNSUBSCRIBE, new UnsubscribeHandler(tm, conf, sm, dm));
         handlers.put(OperationType.CONSUME, new ConsumeHandler(tm, sm, conf));
         handlers = Collections.unmodifiableMap(handlers);
         return handlers;
     }
 
-    protected void initializeNetty(SslServerContextFactory sslFactory, Map<OperationType, Handler> handlers) {
+    protected void initializeNetty(SslServerContextFactory sslFactory,
+                                   Map<OperationType, Handler> handlers,
+                                   SubscriptionChannelManager subChannelMgr) {
         boolean isSSLEnabled = (sslFactory != null) ? true : false;
         InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
         ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
-        UmbrellaHandler umbrellaHandler = new UmbrellaHandler(allChannels, handlers, isSSLEnabled);
-        PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, sslFactory, conf
-                .getMaximumMessageSize());
+        UmbrellaHandler umbrellaHandler =
+            new UmbrellaHandler(allChannels, handlers, subChannelMgr, isSSLEnabled);
+        PubSubServerPipelineFactory pipeline =
+            new PubSubServerPipelineFactory(umbrellaHandler, sslFactory,
+                                            conf.getMaximumMessageSize());
 
         bootstrap.setPipelineFactory(pipeline);
         bootstrap.setOption("child.tcpNoDelay", true);
@@ -297,14 +305,14 @@ public class PubSubServer {
         unregisterJMX();
     }
 
-    protected void registerJMX(Map<OperationType, Handler> handlers) {
+    protected void registerJMX(SubscriptionChannelManager subChannelMgr) {
         try {
             String jmxName = JMXNAME_PREFIX + conf.getServerPort() + "_"
                                             + conf.getSSLServerPort();
             jmxServerBean = new PubSubServerBean(jmxName);
             HedwigMBeanRegistry.getInstance().register(jmxServerBean, null);
             try {
-                jmxNettyBean = new NettyHandlerBean(handlers);
+                jmxNettyBean = new NettyHandlerBean(subChannelMgr);
                 HedwigMBeanRegistry.getInstance().register(jmxNettyBean, jmxServerBean);
             } catch (Exception e) {
                 logger.warn("Failed to register with JMX", e);
@@ -408,14 +416,17 @@ public class PubSubServer {
                     // Initialize the Netty Handlers (used by the
                     // UmbrellaHandler) once so they can be shared by
                     // both the SSL and non-SSL channels.
-                    Map<OperationType, Handler> handlers = initializeNettyHandlers(tm, dm, pm, sm);
+                    SubscriptionChannelManager subChannelMgr = new SubscriptionChannelManager();
+                    Map<OperationType, Handler> handlers =
+                        initializeNettyHandlers(tm, dm, pm, sm, subChannelMgr);
                     // Initialize Netty for the regular non-SSL channels
-                    initializeNetty(null, handlers);
+                    initializeNetty(null, handlers, subChannelMgr);
                     if (conf.isSSLEnabled()) {
-                        initializeNetty(new SslServerContextFactory(conf), handlers);
+                        initializeNetty(new SslServerContextFactory(conf),
+                                        handlers, subChannelMgr);
                     }
                     // register jmx
-                    registerJMX(handlers);
+                    registerJMX(subChannelMgr);
                 } catch (Exception e) {
                     ConcurrencyUtils.put(queue, Either.right(e));
                     return;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java Mon Oct 22 10:10:38 2012
@@ -48,17 +48,18 @@ import org.apache.hedwig.server.handlers
 public class UmbrellaHandler extends SimpleChannelHandler {
     static Logger logger = LoggerFactory.getLogger(UmbrellaHandler.class);
 
-    private Map<OperationType, Handler> handlers;
-    private ChannelGroup allChannels;
-    private ChannelDisconnectListener subscribeHandler;
-    private boolean isSSLEnabled = false;
+    private final Map<OperationType, Handler> handlers;
+    private final ChannelGroup allChannels;
+    private final ChannelDisconnectListener channelDisconnectListener;
+    private final boolean isSSLEnabled; 
 
     public UmbrellaHandler(ChannelGroup allChannels, Map<OperationType, Handler> handlers,
+                           ChannelDisconnectListener channelDisconnectListener,
                            boolean isSSLEnabled) {
         this.allChannels = allChannels;
         this.isSSLEnabled = isSSLEnabled;
         this.handlers = handlers;
-        subscribeHandler = (ChannelDisconnectListener) handlers.get(OperationType.SUBSCRIBE);
+        this.channelDisconnectListener = channelDisconnectListener;
     }
 
     @Override
@@ -116,7 +117,7 @@ public class UmbrellaHandler extends Sim
     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         Channel channel = ctx.getChannel();
         // subscribe handler needs to know about channel disconnects
-        subscribeHandler.channelDisconnected(channel);
+        channelDisconnectListener.channelDisconnected(channel);
         channel.close();
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Mon Oct 22 10:10:38 2012
@@ -39,6 +39,7 @@ import org.jboss.netty.logging.Log4JLogg
 import org.apache.hedwig.client.HedwigClient;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
+import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
 import org.apache.hedwig.server.handlers.Handler;
 import org.apache.hedwig.server.netty.PubSubServer;
 import org.apache.hedwig.server.netty.PubSubServerPipelineFactory;
@@ -112,7 +113,10 @@ public class HedwigProxy {
         InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
         allChannels = new DefaultChannelGroup("hedwigproxy");
         ServerBootstrap bootstrap = new ServerBootstrap(serverSocketChannelFactory);
-        UmbrellaHandler umbrellaHandler = new UmbrellaHandler(allChannels, handlers, false);
+        ChannelDisconnectListener disconnectListener =
+            (ChannelDisconnectListener) handlers.get(OperationType.SUBSCRIBE);
+        UmbrellaHandler umbrellaHandler =
+            new UmbrellaHandler(allChannels, handlers, disconnectListener, false);
         PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, null, cfg
                 .getMaximumMessageSize());
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1400827&r1=1400826&r2=1400827&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Mon Oct 22 10:10:38 2012
@@ -56,6 +56,7 @@ public class TestSubUnsubHandler extends
     SubscribeHandler sh;
     StubDeliveryManager dm;
     StubSubscriptionManager sm;
+    SubscriptionChannelManager subChannelMgr;
     ByteString topic = ByteString.copyFromUtf8("topic");
     WriteRecordingChannel channel;
 
@@ -75,7 +76,8 @@ public class TestSubUnsubHandler extends
         dm = new StubDeliveryManager();
         PersistenceManager pm = LocalDBPersistenceManager.instance();
         sm = new StubSubscriptionManager(tm, pm, dm, conf, executor);
-        sh = new SubscribeHandler(tm, dm, pm, sm, conf);
+        subChannelMgr = new SubscriptionChannelManager();
+        sh = new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr);
         channel = new WriteRecordingChannel();
 
         subscriberId = ByteString.copyFromUtf8("subId");
@@ -105,8 +107,10 @@ public class TestSubUnsubHandler extends
         assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
 
         // make sure the channel was put in the maps
-        assertEquals(new TopicSubscriber(topic, subscriberId), sh.channel2sub.get(channel));
-        assertEquals(channel, sh.sub2Channel.get(new TopicSubscriber(topic, subscriberId)));
+        assertEquals(new TopicSubscriber(topic, subscriberId),
+                     subChannelMgr.channel2sub.get(channel));
+        assertEquals(channel,
+                     subChannelMgr.sub2Channel.get(new TopicSubscriber(topic, subscriberId)));
 
         // make sure delivery was started
         StartServingRequest startRequest = (StartServingRequest) dm.lastRequest.poll();
@@ -134,7 +138,7 @@ public class TestSubUnsubHandler extends
         assertEquals(StatusCode.TOPIC_BUSY, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
 
         // after disconnecting the channel, subscribe should work again
-        sh.channelDisconnected(channel);
+        subChannelMgr.channelDisconnected(channel);
 
         dupChannel = new WriteRecordingChannel();
         sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);