You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/03/23 17:02:52 UTC
svn commit: r521782 - in /incubator/qpid/branches/M2/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/protocol/
perftests/src/main/java/org/apache/qpid/client/message/
perftests/src/main/java/org/apache/qpi...
Author: bhupendrab
Date: Fri Mar 23 09:02:51 2007
New Revision: 521782
URL: http://svn.apache.org/viewvc?view=rev&rev=521782
Log:
QPID-420 (merged from trunk) And r518998:518999 and r520846:520850
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=521782&r1=521781&r2=521782
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Fri Mar 23 09:02:51 2007
@@ -171,7 +171,7 @@
}
catch (AMQException ex)
{
- throw new MBeanException(ex,"Error in creating queue " + queueName);
+ throw new MBeanException(new JMException(ex.getMessage()),"Error in creating queue " + queueName);
}
}
@@ -202,7 +202,7 @@
}
catch (AMQException ex)
{
- throw new MBeanException(ex, ex.toString());
+ throw new MBeanException(new JMException(ex.getMessage()), "Error in deleting queue " + queueName);
}
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=521782&r1=521781&r2=521782
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Mar 23 09:02:51 2007
@@ -74,6 +74,8 @@
private AMQShortString _contextKey;
+ private AMQShortString _clientVersion = null;
+
private VirtualHost _virtualHost;
private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
@@ -667,9 +669,16 @@
public void setClientProperties(FieldTable clientProperties)
{
_clientProperties = clientProperties;
- if ((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
+ if (_clientProperties != null)
{
- setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
+ if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)
+ {
+ setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
+ }
+ if (_clientProperties.getString(ClientProperties.version.toString()) != null)
+ {
+ _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
+ }
}
}
@@ -745,5 +754,8 @@
{
return _authorizedID;
}
-
+ public String getClientVersion()
+ {
+ return _clientVersion == null ? null : _clientVersion.toString();
+ }
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=521782&r1=521781&r2=521782
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Mar 23 09:02:51 2007
@@ -56,6 +56,7 @@
{
private AMQMinaProtocolSession _session = null;
private String _name = null;
+
//openmbean data types for representing the channel attributes
private final static String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
private final static String[] _indexNames = {_channelAtttibuteNames[0]};
@@ -95,12 +96,26 @@
*/
private static void init() throws OpenDataException
{
-
_channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
_channelAtttibuteNames, _channelAttributeTypes);
_channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
}
+ public String getClientId()
+ {
+ return _session.getContextKey() == null ? null : _session.getContextKey().toString();
+ }
+
+ public String getAuthorizedId()
+ {
+ return _session.getAuthorizedID();
+ }
+
+ public String getVersion()
+ {
+ return _session.getClientVersion() == null ? null : _session.getClientVersion().toString();
+ }
+
public Date getLastIoTime()
{
return new Date(_session.getIOSession().getLastIoTime());
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=521782&r1=521781&r2=521782
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Fri Mar 23 09:02:51 2007
@@ -41,6 +41,15 @@
{
static final String TYPE = "Connection";
+ @MBeanAttribute(name = "ClientId", description = "Client Id")
+ String getClientId();
+
+ @MBeanAttribute(name = "AuthorizedId", description = "User Name")
+ String getAuthorizedId();
+
+ @MBeanAttribute(name = "Version", description = "Client Version")
+ String getVersion();
+
/**
* Tells the remote address of this connection.
* @return remote address
Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java?view=diff&rev=521782&r1=521781&r2=521782
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java Fri Mar 23 09:02:51 2007
@@ -103,7 +103,7 @@
{
StringBuffer buf = new StringBuffer(size);
int count = 0;
- while (count < size)
+ while (count <= (size - MESSAGE_DATA_BYTES.length()))
{
buf.append(MESSAGE_DATA_BYTES);
count += MESSAGE_DATA_BYTES.length();
Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=521782&r1=521781&r2=521782
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Mar 23 09:02:51 2007
@@ -497,11 +497,12 @@
boolean transacted = config.isTransacted();
boolean persistent = config.usePersistentMessages();
int messageSize = (config.getPayload() != 0) ? config.getPayload() : MESSAGE_SIZE_DEAFULT;
- // int messageCount = config.getMessages();
+ int messageCount = config.getMessages();
int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DESTINATION_COUNT_DEFAULT;
int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : TX_BATCH_SIZE_DEFAULT;
int rate = (config.getRate() != 0) ? config.getRate() : RATE_DEFAULT;
boolean pubsub = config.isPubSub();
+ long timeout = (config.getTimeout() != 0) ? config.getTimeout() : TIMEOUT_DEFAULT;
String destName = config.getDestination();
if (destName == null)
@@ -561,10 +562,20 @@
// Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
pingProducer.getConnection().setExceptionListener(pingProducer);
- // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
- Thread pingThread = new Thread(pingProducer);
- pingThread.run();
- pingThread.join();
+ // If messageount is 0, then continue sending
+ if (messageCount == 0)
+ {
+ // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+ Thread pingThread = new Thread(pingProducer);
+ pingThread.start();
+ pingThread.join();
+ }
+ else
+ {
+ // This feature is needed, when we want to send fix no of messages
+ pingProducer.pingLoop(messageCount, timeout);
+ }
+ pingProducer.close();
}
/**
@@ -963,7 +974,7 @@
/**
* The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
*/
- public void pingLoop()
+ public void pingLoop(int pingCount, long timeout)
{
try
{
@@ -972,7 +983,7 @@
msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
// Send the message and wait for a reply.
- pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT);
+ pingAndWaitForReply(msg, pingCount, timeout);
}
catch (JMSException e)
{
@@ -984,6 +995,11 @@
_publish = false;
_logger.debug("There was an interruption: " + e.getMessage(), e);
}
+ }
+
+ public void pingLoop()
+ {
+ pingLoop(TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT);
}
public Destination getReplyDestination()
Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/topic/Config.java?view=diff&rev=521782&r1=521781&r2=521782
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/topic/Config.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/topic/Config.java Fri Mar 23 09:02:51 2007
@@ -51,6 +51,7 @@
private int batchSize;
private int rate;
private boolean ispubsub;
+ private long timeout;
public Config()
{
@@ -161,6 +162,16 @@
this.delay = delay;
}
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ public void setTimeout(long time)
+ {
+ this.timeout = time;
+ }
+
public String getClientId()
{
return clientId;
@@ -284,6 +295,10 @@
else if("-destinationname".equalsIgnoreCase(key))
{
destinationName = value;
+ }
+ else if("-timeout".equalsIgnoreCase(key))
+ {
+ setTimeout(parseLong("Bad timeout data", value));
}
else
{