You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/04/30 14:24:42 UTC

svn commit: r533721 - in /incubator/qpid/branches/M2/java/client/src: main/java/org/apache/qpid/client/ main/java/org/apache/qpid/client/protocol/ test/java/org/apache/qpid/test/unit/client/protocol/

Author: rgodfrey
Date: Mon Apr 30 05:24:41 2007
New Revision: 533721

URL: http://svn.apache.org/viewvc?view=rev&rev=533721
Log:
QPID-476 : Remove duplicate map of channelId to session

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Apr 30 05:24:41 2007
@@ -96,7 +96,8 @@
     private AMQProtocolHandler _protocolHandler;
 
     /** Maps from session id (Integer) to AMQSession instance */
-    private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+    private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>();
+
 
     private String _clientName;
 
@@ -508,7 +509,7 @@
                             AMQSession session =
                                 new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
                                     prefetchLow);
-                            _protocolHandler.addSessionByChannel(channelId, session);
+                            //_protocolHandler.addSessionByChannel(channelId, session);
                             registerSession(channelId, session);
 
                             boolean success = false;
@@ -527,7 +528,6 @@
                             {
                                 if (!success)
                                 {
-                                    _protocolHandler.removeSessionByChannel(channelId);
                                     deregisterSession(channelId);
                                 }
                             }
@@ -589,7 +589,6 @@
         }
         catch (AMQException e)
         {
-            _protocolHandler.removeSessionByChannel(channelId);
             deregisterSession(channelId);
             throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e);
         }
@@ -1136,7 +1135,7 @@
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {
             AMQSession s = (AMQSession) it.next();
-            _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+            //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
             reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
             s.resubscribe();
         }
@@ -1222,5 +1221,11 @@
     public void performConnectionTask(Runnable task)
     {
         _taskPool.execute(task);
+    }
+
+
+    public AMQSession getSession(int channelId)
+    {
+        return _sessions.get(channelId);
     }
 }

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Apr 30 05:24:41 2007
@@ -429,17 +429,7 @@
         }
     }
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
-               MessageFactoryRegistry messageFactoryRegistry)
-    {
-        this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
-    }
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
-               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
-    {
-        this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
-    }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
@@ -493,15 +483,7 @@
         }
     }
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode)
-    {
-        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry());
-    }
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch)
-    {
-        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
-    }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
     {
@@ -796,7 +778,7 @@
                 amqe = new AMQException("Closing session forcibly", e);
             }
             _connection.deregisterSession(_channelId);
-            closeProducersAndConsumers(amqe);
+            closeProducersAndConsumers(amqe);            
         }
     }
 
@@ -809,6 +791,7 @@
         _closed.set(true);
         _connection.deregisterSession(_channelId);
         markClosedProducersAndConsumers();
+
     }
 
     private void markClosedProducersAndConsumers()

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Apr 30 05:24:41 2007
@@ -490,27 +490,7 @@
                                                 new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
     }
 
-    /**
-     * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
-     * handler will ensure that messages are delivered to the consumer(s) on that session.
-     *
-     * @param channelId the channel id of the session
-     * @param session   the session instance.
-     */
-    public void addSessionByChannel(int channelId, AMQSession session)
-    {
-        _protocolSession.addSessionByChannel(channelId, session);
-    }
 
-    /**
-     * Convenience method to deregister an AMQSession with the protocol handler.
-     *
-     * @param channelId then channel id of the session
-     */
-    public void removeSessionByChannel(int channelId)
-    {
-        _protocolSession.removeSessionByChannel(channelId);
-    }
 
     public void closeSession(AMQSession session) throws AMQException
     {

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Apr 30 05:24:41 2007
@@ -85,7 +85,7 @@
     protected final AMQProtocolHandler _protocolHandler;
 
     /** Maps from the channel id to the AMQSession that it represents. */
-    protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+    protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
 
     protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
 
@@ -104,26 +104,13 @@
     private VersionSpecificRegistry _registry =
         MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
 
-    /**
-     * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
-     * test
-     */
-    public AMQProtocolSession()
-    {
-        _protocolHandler = null;
-        _minaProtocolSession = null;
-        _stateManager = new AMQStateManager(this);
-    }
+    private final AMQConnection _connection;
+
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
     {
-        _protocolHandler = protocolHandler;
-        _minaProtocolSession = protocolSession;
-        // properties of the connection are made available to the event handlers
-        _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
-        // fixme - real value needed
-        _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        _stateManager = new AMQStateManager(this);
+        this(protocolHandler,protocolSession,connection, new AMQStateManager());
+
     }
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection,
@@ -138,6 +125,7 @@
         _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
         _stateManager = stateManager;
         _stateManager.setProtocolSession(this);
+        _connection = connection;
 
     }
 
@@ -305,11 +293,16 @@
      */
     private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
     {
-        AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+        AMQSession session = getSession(channelId);
         session.messageReceived(msg);
         _channelId2UnprocessedMsgMap.remove(channelId);
     }
 
+    protected AMQSession getSession(int channelId)
+    {
+        return _connection.getSession(channelId);
+    }
+
     /**
      * Convenience method that writes a frame to the protocol session. Equivalent to calling
      * getProtocolSession().write().
@@ -335,32 +328,6 @@
         }
     }
 
-    public void addSessionByChannel(int channelId, AMQSession session)
-    {
-        if (channelId <= 0)
-        {
-            throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
-        }
-
-        if (session == null)
-        {
-            throw new IllegalArgumentException("Attempt to register a null session");
-        }
-
-        _logger.debug("Add session with channel id  " + channelId);
-        _channelId2SessionMap.put(channelId, session);
-    }
-
-    public void removeSessionByChannel(int channelId)
-    {
-        if (channelId <= 0)
-        {
-            throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
-        }
-
-        _logger.debug("Removing session with channelId " + channelId);
-        _channelId2SessionMap.remove(channelId);
-    }
 
     /**
      * Starts the process of closing a session
@@ -393,11 +360,11 @@
      */
     public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
     {
-        final Integer chId = channelId;
+
         // if this is not a response to an earlier request to close the channel
-        if (_closingChannels.remove(chId) == null)
+        if (_closingChannels.remove(channelId) == null)
         {
-            final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+            final AMQSession session = getSession(channelId);
             try
             {
                 session.closed(new AMQException(code, text));
@@ -469,8 +436,7 @@
 
     public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
     {
-        final Integer chId = channelId;
-        final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+        final AMQSession session = getSession(channelId);
 
         session.confirmConsumerCancelled(consumerTag);
     }

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Mon Apr 30 05:24:41 2007
@@ -32,9 +32,6 @@
 {
     private static class AMQProtSession extends AMQProtocolSession
     {
-        public AMQProtSession()
-        {
-        }
 
         public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
         {