You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/25 22:00:24 UTC
svn commit: r499968 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/protocol/
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client...
Author: kpvdr
Date: Thu Jan 25 13:00:22 2007
New Revision: 499968
URL: http://svn.apache.org/viewvc?view=rev&rev=499968
Log:
Added mechanism to track connection ids for logging and debugging purposes. Changed format of log/debug messages for RequestManager and ResponseManager.
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Jan 25 13:00:22 2007
@@ -137,8 +137,8 @@
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
- _requestManager = new RequestManager(channelId, _session, true);
- _responseManager = new ResponseManager(channelId, methodListener, _session, true);
+ _requestManager = new RequestManager(_session.getConnectionId(), channelId, _session, true);
+ _responseManager = new ResponseManager(_session.getConnectionId(), channelId, methodListener, _session, true);
_txnBuffer = new TxnBuffer(_messageStore);
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Jan 25 13:00:22 2007
@@ -74,6 +74,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
public class AMQMinaProtocolSession implements AMQProtocolSession,
ProtocolVersionList,
@@ -117,6 +118,10 @@
private byte _minor;
private FieldTable _clientProperties;
+ // Keeps a tally of connections for logging and debugging
+ private static AtomicInteger _ConnectionId;
+ static { _ConnectionId = new AtomicInteger(0); }
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -127,6 +132,7 @@
AMQCodecFactory codecFactory)
throws AMQException
{
+ _ConnectionId.incrementAndGet();
_stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
@@ -144,6 +150,7 @@
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
+ _ConnectionId.incrementAndGet();
_stateManager = stateManager;
_minaProtocolSession = session;
session.setAttachment(this);
@@ -699,5 +706,10 @@
if (!versionEquals(methodBody.getMajor(), methodBody.getMinor())) {
throw new RuntimeException("MethodBody version did not match version of current session.");
}
+ }
+
+ public int getConnectionId()
+ {
+ return _ConnectionId.get();
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Jan 25 13:00:22 2007
@@ -149,4 +149,5 @@
byte getMinor();
boolean versionEquals(byte major, byte minor);
void checkMethodBodyVersion(AMQMethodBody methodBody);
+ int getConnectionId();
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Jan 25 13:00:22 2007
@@ -78,7 +78,7 @@
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
-
+
private AtomicInteger _idFactory = new AtomicInteger(0);
/**
@@ -155,6 +155,10 @@
private AMQException _lastAMQException = null;
+ // Keeps a tally of connections for logging and debugging
+ private static AtomicInteger _ConnectionId;
+ static { _ConnectionId = new AtomicInteger(0); }
+
/*
* The connection meta data
*/
@@ -200,7 +204,7 @@
public AMQConnection(ConnectionURL connectionURL) throws AMQException
{
_logger.info("Connection:" + connectionURL);
-
+ _ConnectionId.incrementAndGet();
if (connectionURL == null)
{
throw new IllegalArgumentException("Connection must be specified");
@@ -1019,5 +1023,10 @@
new StringRefAddr(AMQConnection.class.getName(), toURL()),
AMQConnectionFactory.class.getName(),
null); // factory location
+ }
+
+ public int getConnectionId()
+ {
+ return _ConnectionId.get();
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Jan 25 13:00:22 2007
@@ -576,4 +576,9 @@
{
_failoverState = failoverState;
}
+
+ public int getConnectionId()
+ {
+ return _connection.getConnectionId();
+ }
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Jan 25 13:00:22 2007
@@ -106,6 +106,8 @@
*/
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
+
+ protected int _ConnectionId;
/**
* No-arg constructor for use by test subclass - has to initialise final vars
@@ -118,8 +120,9 @@
_stateManager = new AMQStateManager(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+ _ConnectionId = 0;
+ _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -131,8 +134,9 @@
_stateManager = new AMQStateManager(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+ _ConnectionId = _protocolHandler == null ? 0 : _protocolHandler.getConnectionId();
+ _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
@@ -146,8 +150,9 @@
_stateManager.setProtocolSession(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+ _ConnectionId = _protocolHandler == null ? 0 : _protocolHandler.getConnectionId();
+ _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
}
public void init()
@@ -379,12 +384,11 @@
// Add request and response handlers, one per channel, if they do not already exist
if (_channelId2RequestMgrMap.get(channelId) == null)
{
- _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this, false));
+ _channelId2RequestMgrMap.put(channelId, new RequestManager(_ConnectionId, channelId, this, false));
}
if (_channelId2ResponseMgrMap.get(channelId) == null)
{
-
- _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this, false));
+ _channelId2ResponseMgrMap.put(channelId, new ResponseManager(_ConnectionId, channelId, _stateManager, this, false));
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Thu Jan 25 13:00:22 2007
@@ -40,6 +40,7 @@
* to be known.
*/
private boolean serverFlag;
+ private int connectionId;
/**
* Request and response frames must have a requestID and responseID which
@@ -55,11 +56,12 @@
private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap;
- public RequestManager(int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
+ public RequestManager(int connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.protocolWriter = protocolWriter;
this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
requestSentMap = new ConcurrentHashMap<Long, AMQMethodListener>();
@@ -77,11 +79,11 @@
protocolWriter.writeFrame(requestFrame);
if (logger.isDebugEnabled())
{
- logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
- " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
- // " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+ //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ // "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
return requestId;
}
@@ -92,11 +94,11 @@
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
if (logger.isDebugEnabled())
{
- logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
- " " + responseBody + "; " + responseBody.getMethodPayload());
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+ responseBody + "; " + responseBody.getMethodPayload());
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
- // " " + responseBody + "; " + responseBody.getMethodPayload());
+ //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+ // responseBody + "; " + responseBody.getMethodPayload());
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
AMQMethodListener methodListener = requestSentMap.get(requestId);
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Thu Jan 25 13:00:22 2007
@@ -43,6 +43,7 @@
* to be known.
*/
private boolean serverFlag;
+ private int connectionId;
private int maxAccumulatedResponses = 20; // Default
// private Class currentResponseMethodBodyClass;
@@ -83,13 +84,14 @@
private ConcurrentHashMap<Long, ResponseStatus> responseMap;
- public ResponseManager(int channel, AMQMethodListener methodListener,
+ public ResponseManager(int connectionId, int channel, AMQMethodListener methodListener,
AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.methodListener = methodListener;
this.protocolWriter = protocolWriter;
this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
// currentResponseMethodBodyClass = null;
@@ -103,11 +105,11 @@
long requestId = requestBody.getRequestId();
if (logger.isDebugEnabled())
{
- logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
- " " + requestBody + "; " + requestBody.getMethodPayload());
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " +
+ requestBody + "; " + requestBody.getMethodPayload());
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
- // " " + requestBody + "; " + requestBody.getMethodPayload());
+ //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " +
+ // requestBody + "; " + requestBody.getMethodPayload());
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
@@ -122,11 +124,11 @@
{
if (logger.isDebugEnabled())
{
- logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
- " Res[# " + requestId + "]; " + responseMethodBody);
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
- // " Res[# " + requestId + "]; " + responseMethodBody);
+ //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ // "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
throw new RequestResponseMappingException(requestId,
Modified: incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java Thu Jan 25 13:00:22 2007
@@ -35,6 +35,7 @@
import javax.security.sasl.SaslServer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A protocol session that can be used for testing purposes.
@@ -45,8 +46,13 @@
private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ // Keeps a tally of connections for logging and debugging
+ private static AtomicInteger _ConnectionId;
+ static { _ConnectionId = new AtomicInteger(0); }
+
public MockProtocolSession(MessageStore messageStore)
{
+ _ConnectionId.incrementAndGet();
_messageStore = messageStore;
}
@@ -221,4 +227,9 @@
// TODO Auto-generated method stub
}
+
+ public int getConnectionId()
+ {
+ return _ConnectionId.get();
+ }
}