You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/04/30 14:24:42 UTC
svn commit: r533721 - in /incubator/qpid/branches/M2/java/client/src:
main/java/org/apache/qpid/client/ main/java/org/apache/qpid/client/protocol/
test/java/org/apache/qpid/test/unit/client/protocol/
Author: rgodfrey
Date: Mon Apr 30 05:24:41 2007
New Revision: 533721
URL: http://svn.apache.org/viewvc?view=rev&rev=533721
Log:
QPID-476 : Remove duplicate map of channelId to session
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Apr 30 05:24:41 2007
@@ -96,7 +96,8 @@
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>();
+
private String _clientName;
@@ -508,7 +509,7 @@
AMQSession session =
new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
- _protocolHandler.addSessionByChannel(channelId, session);
+ //_protocolHandler.addSessionByChannel(channelId, session);
registerSession(channelId, session);
boolean success = false;
@@ -527,7 +528,6 @@
{
if (!success)
{
- _protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
}
}
@@ -589,7 +589,6 @@
}
catch (AMQException e)
{
- _protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e);
}
@@ -1136,7 +1135,7 @@
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
- _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
}
@@ -1222,5 +1221,11 @@
public void performConnectionTask(Runnable task)
{
_taskPool.execute(task);
+ }
+
+
+ public AMQSession getSession(int channelId)
+ {
+ return _sessions.get(channelId);
}
}
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Apr 30 05:24:41 2007
@@ -429,17 +429,7 @@
}
}
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry)
- {
- this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
- }
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
@@ -493,15 +483,7 @@
}
}
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry());
- }
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
@@ -796,7 +778,7 @@
amqe = new AMQException("Closing session forcibly", e);
}
_connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ closeProducersAndConsumers(amqe);
}
}
@@ -809,6 +791,7 @@
_closed.set(true);
_connection.deregisterSession(_channelId);
markClosedProducersAndConsumers();
+
}
private void markClosedProducersAndConsumers()
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Apr 30 05:24:41 2007
@@ -490,27 +490,7 @@
new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
}
- /**
- * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
- * handler will ensure that messages are delivered to the consumer(s) on that session.
- *
- * @param channelId the channel id of the session
- * @param session the session instance.
- */
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- _protocolSession.addSessionByChannel(channelId, session);
- }
- /**
- * Convenience method to deregister an AMQSession with the protocol handler.
- *
- * @param channelId then channel id of the session
- */
- public void removeSessionByChannel(int channelId)
- {
- _protocolSession.removeSessionByChannel(channelId);
- }
public void closeSession(AMQSession session) throws AMQException
{
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Apr 30 05:24:41 2007
@@ -85,7 +85,7 @@
protected final AMQProtocolHandler _protocolHandler;
/** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+ protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
@@ -104,26 +104,13 @@
private VersionSpecificRegistry _registry =
MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
- /**
- * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
- * test
- */
- public AMQProtocolSession()
- {
- _protocolHandler = null;
- _minaProtocolSession = null;
- _stateManager = new AMQStateManager(this);
- }
+ private final AMQConnection _connection;
+
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- // properties of the connection are made available to the event handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _stateManager = new AMQStateManager(this);
+ this(protocolHandler,protocolSession,connection, new AMQStateManager());
+
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection,
@@ -138,6 +125,7 @@
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
+ _connection = connection;
}
@@ -305,11 +293,16 @@
*/
private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
{
- AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+ AMQSession session = getSession(channelId);
session.messageReceived(msg);
_channelId2UnprocessedMsgMap.remove(channelId);
}
+ protected AMQSession getSession(int channelId)
+ {
+ return _connection.getSession(channelId);
+ }
+
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -335,32 +328,6 @@
}
}
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
- }
-
- if (session == null)
- {
- throw new IllegalArgumentException("Attempt to register a null session");
- }
-
- _logger.debug("Add session with channel id " + channelId);
- _channelId2SessionMap.put(channelId, session);
- }
-
- public void removeSessionByChannel(int channelId)
- {
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
- }
-
- _logger.debug("Removing session with channelId " + channelId);
- _channelId2SessionMap.remove(channelId);
- }
/**
* Starts the process of closing a session
@@ -393,11 +360,11 @@
*/
public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
{
- final Integer chId = channelId;
+
// if this is not a response to an earlier request to close the channel
- if (_closingChannels.remove(chId) == null)
+ if (_closingChannels.remove(channelId) == null)
{
- final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+ final AMQSession session = getSession(channelId);
try
{
session.closed(new AMQException(code, text));
@@ -469,8 +436,7 @@
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
- final Integer chId = channelId;
- final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+ final AMQSession session = getSession(channelId);
session.confirmConsumerCancelled(consumerTag);
}
Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Mon Apr 30 05:24:41 2007
@@ -32,9 +32,6 @@
{
private static class AMQProtSession extends AMQProtocolSession
{
- public AMQProtSession()
- {
- }
public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{