You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/16 12:00:47 UTC

svn commit: r508366 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/state/ systests/src/main/java/org/apache/qp...

Author: ritchiem
Date: Fri Feb 16 03:00:46 2007
New Revision: 508366

URL: http://svn.apache.org/viewvc?view=rev&rev=508366
Log:
QPID-372 Broker doesn't wait for ChannelClose-Ok.
Updated AMQProtocolSession to have new methods to query and release a channel from the awaiting close-ok state. Once a channel has been signalled to be closed any further methods on that channel are ignored until a close-ok is sent.

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java?view=diff&rev=508366&r1=508365&r2=508366
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java Fri Feb 16 03:00:46 2007
@@ -44,6 +44,10 @@
 
     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
     {
-        _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+        int channelId = evt.getChannelId();
+        _logger.info("Received channel-close-ok for channel-id " + channelId);
+        
+        // Let the Protocol Session know the channel is now closed.
+        stateManager.getProtocolSession().closeChannelOk(channelId);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=508366&r1=508365&r2=508366
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Feb 16 03:00:46 2007
@@ -64,6 +64,7 @@
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
@@ -87,7 +88,7 @@
 
     private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
 
-    private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE+1];
+    private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
 
     private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
 
@@ -108,12 +109,12 @@
     private long _maxNoOfChannels = 1000;
 
     /* AMQP Version for this session */
-    private byte _major = pv[pv.length-1][PROTOCOL_MAJOR];
-    private byte _minor = pv[pv.length-1][PROTOCOL_MINOR];
+    private byte _major = pv[pv.length - 1][PROTOCOL_MAJOR];
+    private byte _minor = pv[pv.length - 1][PROTOCOL_MINOR];
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
-    private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
-    
+    private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length - 1][PROTOCOL_MAJOR], pv[pv.length - 1][PROTOCOL_MINOR]);
+    private List<Integer> _closingChannelsList = new ArrayList<Integer>();
 
 
     public ManagedObject getManagedObject()
@@ -129,7 +130,6 @@
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _minaProtocolSession = session;
         session.setAttachment(this);
-        
 
 
         _codecFactory = codecFactory;
@@ -145,24 +145,20 @@
         catch (RuntimeException e)
         {
             e.printStackTrace();
-        //    throw e;
+            //    throw e;
 
         }
 
-
-
-
 //        this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
     }
 
-     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
+    public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
                                   AMQCodecFactory codecFactory, AMQStateManager stateManager)
             throws AMQException
     {
         _stateManager = stateManager;
         _minaProtocolSession = session;
         session.setAttachment(this);
-        
 
         _codecFactory = codecFactory;
 
@@ -205,7 +201,7 @@
                 pi.checkVersion(this); // Fails if not correct
 
                 // This sets the protocol version (and hence framing classes) for this session.
-                setProtocolVersion(pi.protocolMajor,pi.protocolMinor);
+                setProtocolVersion(pi.protocolMajor, pi.protocolMinor);
 
                 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
 
@@ -213,12 +209,12 @@
 
                 // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
                 AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
-            		_major, _minor,	// AMQP version (major, minor)
-                    locales.getBytes(),	// locales
-                    mechanisms.getBytes(),	// mechanisms
-                    null,	// serverProperties
-                	(short)_major,	// versionMajor
-                    (short)_minor);	// versionMinor
+                                                                       _major, _minor,    // AMQP version (major, minor)
+                                                                       locales.getBytes(),    // locales
+                                                                       mechanisms.getBytes(),    // mechanisms
+                                                                       null,    // serverProperties
+                                                                       (short) _major,    // versionMajor
+                                                                       (short) _minor);    // versionMinor
                 _minaProtocolSession.write(response);
             }
             catch (AMQException e)
@@ -266,7 +262,7 @@
             {
                 boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
 
-                if(!_frameListeners.isEmpty())
+                if (!_frameListeners.isEmpty())
                 {
                     for (AMQMethodListener listener : _frameListeners)
                     {
@@ -276,7 +272,7 @@
                 }
                 if (!wasAnyoneInterested)
                 {
-                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
                 }
             }
             catch (AMQChannelException e)
@@ -338,12 +334,12 @@
         {
             _logger.debug("Content body frame received: " + frame);
         }
-        getChannel(frame.getChannel()).publishContentBody((ContentBody)frame.getBodyFrame(), this);
+        getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this);
     }
 
     /**
-     * Convenience method that writes a frame to the protocol session. Equivalent
-     * to calling getProtocolSession().write().
+     * Convenience method that writes a frame to the protocol session. Equivalent to calling
+     * getProtocolSession().write().
      *
      * @param frame the frame to write
      */
@@ -370,22 +366,40 @@
 
     public AMQChannel getChannel(int channelId) throws AMQException
     {
-        return ((channelId & CHANNEL_CACHE_SIZE) == channelId)
-                ? _cachedChannels[channelId]
-                : _channelMap.get(channelId);
+        if (channelAwaitingClosure(channelId))
+        {
+            return null;
+        }
+        else
+        {
+            return ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+                   ? _cachedChannels[channelId]
+                   : _channelMap.get(channelId);
+        }
+    }
+
+    public boolean channelAwaitingClosure(int channelId)
+    {
+        return _closingChannelsList.contains(channelId);
     }
 
     public void addChannel(AMQChannel channel) throws AMQException
     {
         if (_closed)
         {
-            throw new AMQException("Session is closed");    
+            throw new AMQException("Session is closed");
         }
 
         final int channelId = channel.getChannelId();
+
+        if (_closingChannelsList.contains(channelId))
+        {
+            throw new AMQException("Session is marked awaiting channel close");
+        }
+
         _channelMap.put(channelId, channel);
 
-        if(((channelId & CHANNEL_CACHE_SIZE) == channelId))
+        if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
         {
             _cachedChannels[channelId] = channel;
         }
@@ -428,12 +442,12 @@
     }
 
     /**
-     * Close a specific channel. This will remove any resources used by the channel, including:
-     * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
-     * </ul>
+     * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
+     * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
      *
      * @param channelId id of the channel to close
-     * @throws AMQException if an error occurs closing the channel
+     *
+     * @throws AMQException             if an error occurs closing the channel
      * @throws IllegalArgumentException if the channel id is not valid
      */
     public void closeChannel(int channelId) throws AMQException
@@ -447,16 +461,26 @@
         {
             try
             {
+                markChannelawaitingCloseOk(channelId);
                 channel.close(this);
             }
             finally
             {
                 removeChannel(channelId);
-
             }
         }
     }
 
+    public void closeChannelOk(int channelId)
+    {
+        _closingChannelsList.remove(new Integer(channelId));
+    }
+
+    private void markChannelawaitingCloseOk(int channelId)
+    {
+        _closingChannelsList.add(channelId);
+    }
+
     /**
      * In our current implementation this is used by the clustering code.
      *
@@ -465,7 +489,7 @@
     public void removeChannel(int channelId)
     {
         _channelMap.remove(channelId);
-        if((channelId & CHANNEL_CACHE_SIZE) == channelId)
+        if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
         {
             _cachedChannels[channelId] = null;
         }
@@ -486,8 +510,7 @@
     }
 
     /**
-     * Closes all channels that were opened by this protocol session. This frees up all resources
-     * used by the channel.
+     * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
      *
      * @throws AMQException if an error occurs while closing any channel
      */
@@ -498,16 +521,13 @@
             channel.close(this);
         }
         _channelMap.clear();
-        for(int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
+        for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
         {
-            _cachedChannels[i]=null;
+            _cachedChannels[i] = null;
         }
     }
 
-    /**
-     * This must be called when the session is _closed in order to free up any resources
-     * managed by the session.
-     */
+    /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     public void closeSession() throws AMQException
     {
         if (!_closed)
@@ -518,7 +538,7 @@
             {
                 _managedObject.unregister();
             }
-            for(Task task : _taskList)
+            for (Task task : _taskList)
             {
                 task.doTask(this);
             }
@@ -535,17 +555,15 @@
         return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
     }
 
-    /**
-     * @return an object that can be used to identity
-     */
+    /** @return an object that can be used to identity */
     public Object getKey()
     {
         return _minaProtocolSession.getRemoteAddress();
     }
 
     /**
-     * Get the fully qualified domain name of the local address to which this session is bound. Since some servers
-     * may be bound to multiple addresses this could vary depending on the acceptor this session was created from.
+     * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
+     * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
      *
      * @return a String FQDN
      */
@@ -586,7 +604,7 @@
     public void setClientProperties(FieldTable clientProperties)
     {
         _clientProperties = clientProperties;
-        if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
+        if ((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
         {
             setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
         }
@@ -596,7 +614,7 @@
     {
         _major = major;
         _minor = minor;
-        _registry = MainRegistry.getVersionSpecificRegistry(major,minor);
+        _registry = MainRegistry.getVersionSpecificRegistry(major, minor);
     }
 
     public byte getProtocolMajorVersion()
@@ -620,10 +638,9 @@
     }
 
 
-
     public Object getClientIdentifier()
     {
-        return _minaProtocolSession.getRemoteAddress();    
+        return _minaProtocolSession.getRemoteAddress();
     }
 
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=508366&r1=508365&r2=508366
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Feb 16 03:00:46 2007
@@ -34,8 +34,6 @@
 public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
 {
 
-
-
     public static interface Task
     {
         public void doTask(AMQProtocolSession session) throws AMQException;
@@ -43,88 +41,108 @@
 
     /**
      * Called when a protocol data block is received
+     *
      * @param message the data block that has been received
+     *
      * @throws Exception if processing the datablock fails
      */
     void dataBlockReceived(AMQDataBlock message) throws Exception;
 
     /**
-     * Get the context key associated with this session. Context key is described
-     * in the AMQ protocol specification (RFC 6).
+     * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
+     * 6).
+     *
      * @return the context key
      */
     AMQShortString getContextKey();
 
     /**
-     * Set the context key associated with this session. Context key is described
-     * in the AMQ protocol specification (RFC 6).
+     * Set the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
+     * 6).
+     *
      * @param contextKey the context key
      */
     void setContextKey(AMQShortString contextKey);
 
     /**
-     * Get the channel for this session associated with the specified id. A channel
-     * id is unique per connection (i.e. per session).
+     * Get the channel for this session associated with the specified id. A channel id is unique per connection (i.e.
+     * per session).
+     *
      * @param channelId the channel id which must be valid
+     *
      * @return null if no channel exists, the channel otherwise
      */
     AMQChannel getChannel(int channelId) throws AMQException;
 
     /**
      * Associate a channel with this session.
-     * @param channel the channel to associate with this session. It is an error to
-     * associate the same channel with more than one session but this is not validated.
+     *
+     * @param channel the channel to associate with this session. It is an error to associate the same channel with more
+     *                than one session but this is not validated.
      */
     void addChannel(AMQChannel channel) throws AMQException;
 
     /**
-     * Close a specific channel. This will remove any resources used by the channel, including:
-     * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
-     * </ul>
+     * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
+     * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
+     *
      * @param channelId id of the channel to close
+     *
      * @throws org.apache.qpid.AMQException if an error occurs closing the channel
-     * @throws IllegalArgumentException if the channel id is not valid
+     * @throws IllegalArgumentException     if the channel id is not valid
      */
     void closeChannel(int channelId) throws AMQException;
 
     /**
+     * Markes the specific channel as closed. This will release the lock for that channel id so a new channel can be
+     * created on that id.
+     *
+     * @param channelId id of the channel to close
+     */
+    void closeChannelOk(int channelId);
+
+    /**
+     * Check to see if this chanel is closing
+     *
+     * @param channelId id to check
+     * @return boolean with state of channel awaiting closure
+     */
+    boolean channelAwaitingClosure(int channelId);
+
+    /**
      * Remove a channel from the session but do not close it.
+     *
      * @param channelId
      */
     void removeChannel(int channelId);
 
     /**
      * Initialise heartbeats on the session.
+     *
      * @param delay delay in seconds (not ms)
      */
     void initHeartbeats(int delay);
 
-    /**
-     * This must be called when the session is _closed in order to free up any resources
-     * managed by the session.
-     */
+    /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     void closeSession() throws AMQException;
 
-    /**
-     * @return a key that uniquely identifies this session
-     */
+    /** @return a key that uniquely identifies this session */
     Object getKey();
 
     /**
-     * Get the fully qualified domain name of the local address to which this session is bound. Since some servers
-     * may be bound to multiple addresses this could vary depending on the acceptor this session was created from.
+     * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
+     * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
      *
      * @return a String FQDN
      */
     String getLocalFQDN();
 
-    /**
-     * @return the sasl server that can perform authentication for this session.
-     */
+    /** @return the sasl server that can perform authentication for this session. */
     SaslServer getSaslServer();
 
     /**
      * Set the sasl server that is to perform authentication for this session.
+     *
      * @param saslServer
      */
     void setSaslServer(SaslServer saslServer);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=508366&r1=508365&r2=508366
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Feb 16 03:00:46 2007
@@ -89,10 +89,8 @@
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 /**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
- *
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
  */
 public class AMQStateManager implements AMQMethodListener
 {
@@ -100,14 +98,12 @@
 
     private final VirtualHostRegistry _virtualHostRegistry;
     private final AMQProtocolSession _protocolSession;
-    /**
-     * The current state
-     */
+    /** The current state */
     private AMQState _currentState;
 
     /**
-     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
-     * The class must be a subclass of AMQFrame.
+     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+     * AMQFrame.
      */
     private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
             new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(AMQState.class);
@@ -206,7 +202,7 @@
 
     public void error(Exception e)
     {
-        _logger.error("State manager received error notification: " + e, e);
+        _logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e);
         for (StateListener l : _stateListeners)
         {
             l.error(e);
@@ -221,7 +217,7 @@
 
             checkChannel(evt, _protocolSession);
 
-            handler.methodReceived(this,  evt);
+            handler.methodReceived(this, evt);
             return true;
         }
         return false;
@@ -230,16 +226,17 @@
     private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
             throws AMQException
     {
-        if(evt.getChannelId() != 0
-                && !(evt.getMethod() instanceof ChannelOpenBody)
-                && protocolSession.getChannel(evt.getChannelId()) == null)
+        if (evt.getChannelId() != 0
+            && !(evt.getMethod() instanceof ChannelOpenBody)
+            && (protocolSession.getChannel(evt.getChannelId()) == null)
+            && !protocolSession.channelAwaitingClosure(evt.getChannelId()))
         {
-            throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(),"No such channel: " + evt.getChannelId());
+            throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId());
         }
     }
 
     protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
-                                                                                             B frame)
+                                                                                               B frame)
             throws IllegalStateTransitionException
     {
         if (_logger.isDebugEnabled())
@@ -250,8 +247,8 @@
                 classToHandlerMap = _state2HandlersMap.get(currentState);
 
         final StateAwareMethodListener<B> handler = classToHandlerMap == null
-                                                          ? null
-                                                          : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
+                                                    ? null
+                                                    : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
 
         if (handler == null)
         {

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=508366&r1=508365&r2=508366
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Fri Feb 16 03:00:46 2007
@@ -94,6 +94,16 @@
     {
     }
 
+    public void closeChannelOk(int channelId)
+    {
+        
+    }
+
+    public boolean channelAwaitingClosure(int channelId)
+    {
+        return false;
+    }
+
     public void removeChannel(int channelId)
     {
         _channelMap.remove(channelId);