You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/18 10:11:14 UTC
svn commit: r488163 [2/4] - in
/incubator/qpid/branches/new_persistence/java: ./ broker/ broker/bin/
broker/etc/ broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/ser...
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Dec 18 01:10:59 2006
@@ -36,7 +36,10 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
-import javax.management.*;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.Notification;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.*;
import javax.security.sasl.SaslServer;
@@ -97,66 +100,35 @@
* augment the ManagedConnection interface and add the appropriate implementation here.
*/
@MBeanDescription("Management Bean for an AMQ Broker Connection")
- private final class ManagedAMQProtocolSession extends AMQManagedObject
- implements ManagedConnection
+ private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection
{
private String _name = null;
- /**
- * Represents the channel attributes sent with channel data.
- */
- private String[] _channelAtttibuteNames = { "ChannelId",
- "Transactional",
- "DefaultQueue",
- "UnacknowledgedMessageCount"};
- private String[] _channelAttributeDescriptions = { "Channel Identifier",
- "is Channel Transactional?",
- "Default Queue Name",
- "Unacknowledged Message Count"};
- private OpenType[] _channelAttributeTypes = { SimpleType.INTEGER,
- SimpleType.BOOLEAN,
- SimpleType.STRING,
- SimpleType.INTEGER};
-
- private String[] _indexNames = { "ChannelId" }; //Channels in the list will be indexed according to channelId.
- private CompositeType _channelType = null; // represents the data type for channel data
- private TabularType _channelsType = null; // Datatype for list of channelsType
+ //openmbean data types for representing the channel attributes
+ private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
+ private String[] _indexNames = {_channelAtttibuteNames[0]};
+ private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
+ private CompositeType _channelType = null; // represents the data type for channel data
+ private TabularType _channelsType = null; // Data type for list of channels type
private TabularDataSupport _channelsList = null;
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public ManagedAMQProtocolSession() throws NotCompliantMBeanException
+ public ManagedAMQProtocolSession() throws JMException
{
super(ManagedConnection.class, ManagedConnection.TYPE);
init();
}
/**
- * initialises the CompositeTypes and TabularType attributes.
+ * initialises the openmbean data types
*/
- private void init()
+ private void init() throws OpenDataException
{
String remote = getRemoteAddress();
remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
-
- try
- {
- _channelType = new CompositeType("channel",
- "Channel Details",
- _channelAtttibuteNames,
- _channelAttributeDescriptions,
- _channelAttributeTypes);
-
- _channelsType = new TabularType("channelsType",
- "List of available channels",
- _channelType,
- _indexNames);
- }
- catch(OpenDataException ex)
- {
- // It should never occur.
- _logger.error("OpenDataTypes could not be created.", ex);
- throw new RuntimeException(ex);
- }
+ _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
+ _channelAtttibuteNames, _channelAttributeTypes);
+ _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
}
public Date getLastIoTime()
@@ -179,12 +151,12 @@
return _minaProtocolSession.getReadBytes();
}
- public Long getMaximumNumberOfAllowedChannels()
+ public Long getMaximumNumberOfChannels()
{
return _maxNoOfChannels;
}
- public void setMaximumNumberOfAllowedChannels(Long value)
+ public void setMaximumNumberOfChannels(Long value)
{
_maxNoOfChannels = value;
}
@@ -239,30 +211,25 @@
* @return list of channels in tabular form.
* @throws OpenDataException
*/
- public TabularData getChannels() throws OpenDataException
+ public TabularData channels() throws OpenDataException
{
_channelsList = new TabularDataSupport(_channelsType);
for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet())
{
AMQChannel channel = entry.getValue();
- Object[] itemValues = {channel.getChannelId(),
- channel.isTransactional(),
+ Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
(channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
channel.getUnacknowledgedMessageMap().size()};
- CompositeData channelData = new CompositeDataSupport(_channelType,
- _channelAtttibuteNames,
- itemValues);
-
+ CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
_channelsList.put(channelData);
}
return _channelsList;
}
-
- public void closeChannel(int id)
- throws Exception
+
+ public void closeChannel(int id) throws Exception
{
try
{
@@ -274,8 +241,7 @@
}
}
- public void closeConnection()
- throws Exception
+ public void closeConnection() throws Exception
{
try
{
@@ -290,13 +256,10 @@
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]
- {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
String name = MonitorNotification.class.getName();
- String description = "An attribute of this MBean has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
- name,
- description);
+ String description = "Channel count has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
return new MBeanNotificationInfo[] {info1};
}
@@ -304,14 +267,11 @@
private void checkForNotification()
{
int channelsCount = _channelMap.size();
- if (channelsCount >= getMaximumNumberOfAllowedChannels())
+ if (channelsCount >= getMaximumNumberOfChannels())
{
- Notification n = new Notification(
- MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
- this,
- ++_notificationSequenceNumber,
- System.currentTimeMillis(),
- "ChannelsCount = " + channelsCount + ", ChannelsCount has reached the threshold value");
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(),
+ "Channel count (" + channelsCount + ") has reached the threshold value");
_broadcaster.sendNotification(n);
}
@@ -347,10 +307,10 @@
{
return new ManagedAMQProtocolSession();
}
- catch(NotCompliantMBeanException ex)
+ catch(JMException ex)
{
- _logger.error("AMQProtocolSession MBean creation has failed.", ex);
- throw new AMQException("AMQProtocolSession MBean creation has failed.", ex);
+ _logger.error("AMQProtocolSession MBean creation has failed ", ex);
+ throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
}
}
@@ -389,6 +349,11 @@
int i = pv.length - 1;
_minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
// TODO: Close connection (but how to wait until message is sent?)
+ // ritchiem 2006-12-04 will this not do?
+// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+// future.join();
+// close connection
+
}
}
else
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Mon Dec 18 01:10:59 2006
@@ -41,83 +41,57 @@
static final String TYPE = "Connection";
/**
- * channel details of all the channels opened for this connection.
- * @return general channel details
- * @throws IOException
- * @throws JMException
+ * Tells the remote address of this connection.
+ * @return remote address
*/
- @MBeanAttribute(name="Channels",
- description="channel details of all the channels opened for this connection")
- TabularData getChannels() throws IOException, JMException;
+ @MBeanAttribute(name="RemoteAddress", description=TYPE + " Address")
+ String getRemoteAddress();
/**
* Tells the last time, the IO operation was done.
* @return last IO time.
*/
- @MBeanAttribute(name="LastIOTime",
- description="The last time, the IO operation was done")
+ @MBeanAttribute(name="LastIOTime", description="The last time, the IO operation was done")
Date getLastIoTime();
/**
- * Tells the remote address of this connection.
- * @return remote address
- */
- @MBeanAttribute(name="RemoteAddress",
- description="The remote address of this connection")
- String getRemoteAddress();
-
- /**
* Tells the total number of bytes written till now.
* @return number of bytes written.
*/
- @MBeanAttribute(name="WrittenBytes",
- description="The total number of bytes written till now")
+ @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
Long getWrittenBytes();
/**
* Tells the total number of bytes read till now.
* @return number of bytes read.
*/
- @MBeanAttribute(name="ReadBytes",
- description="The total number of bytes read till now")
+ @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
Long getReadBytes();
/**
- * Tells the maximum number of channels that can be opened using
- * this connection. This is useful in setting notifications or
+ * Threshold high value for no of channels. This is useful in setting notifications or
* taking required action is there are more channels being created.
- * @return maximum number of channels allowed to be created.
+ * @return threshold limit for no of channels
*/
- Long getMaximumNumberOfAllowedChannels();
+ Long getMaximumNumberOfChannels();
/**
- * Sets the maximum number of channels allowed to be created using
- * this connection.
+ * Sets the threshold high value for number of channels for a connection
* @param value
*/
- @MBeanAttribute(name="MaximumNumberOfAllowedChannels",
- description="The maximum number of channels that can be opened using this connection")
- void setMaximumNumberOfAllowedChannels(Long value);
+ @MBeanAttribute(name="MaximumNumberOfChannels", description="The threshold high value for number of channels for this connection")
+ void setMaximumNumberOfChannels(Long value);
//********** Operations *****************//
/**
- * Closes all the related channels and unregisters this connection from managed objects.
- */
- @MBeanOperation(name="closeConnection",
- description="Closes this connection and all related channels",
- impact= MBeanOperationInfo.ACTION)
- void closeConnection() throws Exception;
-
- /**
- * Unsubscribes the consumers and unregisters the channel from managed objects.
+ * channel details of all the channels opened for this connection.
+ * @return general channel details
+ * @throws IOException
+ * @throws JMException
*/
- @MBeanOperation(name="closeChannel",
- description="Closes the channel with given channeld and" +
- "connected consumers will be unsubscribed",
- impact= MBeanOperationInfo.ACTION)
- void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
- throws Exception;
+ @MBeanOperation(name="channels", description="Channel details for this connection")
+ TabularData channels() throws IOException, JMException;
/**
* Commits the transactions if the channel is transactional.
@@ -125,8 +99,8 @@
* @throws JMException
*/
@MBeanOperation(name="commitTransaction",
- description="Commits the transactions for given channelID, if the channel is transactional",
- impact= MBeanOperationInfo.ACTION)
+ description="Commits the transactions for given channel Id, if the channel is transactional",
+ impact= MBeanOperationInfo.ACTION)
void commitTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
/**
@@ -135,7 +109,24 @@
* @throws JMException
*/
@MBeanOperation(name="rollbackTransactions",
- description="Rollsback the transactions for given channelId, if the channel is transactional",
- impact= MBeanOperationInfo.ACTION)
+ description="Rollsback the transactions for given channel Id, if the channel is transactional",
+ impact= MBeanOperationInfo.ACTION)
void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
+
+ /**
+ * Unsubscribes the consumers and unregisters the channel from managed objects.
+ */
+ @MBeanOperation(name="closeChannel",
+ description="Closes the channel with given channel Id and connected consumers will be unsubscribed",
+ impact= MBeanOperationInfo.ACTION)
+ void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
+ throws Exception;
+
+ /**
+ * Closes all the related channels and unregisters this connection from managed objects.
+ */
+ @MBeanOperation(name="closeConnection",
+ description="Closes this connection and all related channels",
+ impact= MBeanOperationInfo.ACTION)
+ void closeConnection() throws Exception;
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Dec 18 01:10:59 2006
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.*;
@@ -91,19 +92,19 @@
private final AMQQueueMBean _managedObject;
/**
- * max allowed size of a single message(in KBytes).
+ * max allowed size(KB) of a single message
*/
- private long _maxAllowedMessageSize = 10000; // 10 MB
+ private long _maxMessageSize = 10000;
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxAllowedMessageCount = 10000;
+ private Integer _maxMessageCount = 10000;
/**
- * max allowed size in KBytes for all the messages combined together in a queue.
+ * max queue depth(KB) for the queue
*/
- private long _queueDepth = 10000000; // 10 GB
+ private long _maxQueueDepth = 10000000;
/**
* total messages received by the queue since startup.
@@ -123,83 +124,45 @@
private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
{
private String _queueName = null;
+ // OpenMBean data types for viewMessages method
+ private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
+ private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+ private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
+ private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+ private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
+
+ // OpenMBean data types for viewMessageContent method
+ private CompositeType _msgContentType = null;
+ private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+ private OpenType[] _msgContentAttributeTypes = new OpenType[4];
- // AMQ message attribute names
- private String[] _msgAttributeNames = {"MessageId",
- "Header",
- "Size",
- "Redelivered"
- };
- // AMQ Message attribute descriptions.
- private String[] _msgAttributeDescriptions = {"Message Id",
- "Header",
- "Message size in bytes",
- "Redelivered"
- };
-
- private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
- private String[] _msgAttributeIndex = {"MessageId"}; // Messages will be indexed according to the messageId.
- private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
- private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
-
- private CompositeType _msgContentType = null; // For message content
- private String[] _msgContentAttributes = {"MessageId",
- "MimeType",
- "Encoding",
- "Content"
- };
- private String[] _msgContentDescriptions = {"Message Id",
- "MimeType",
- "Encoding",
- "Message content"
- };
- private OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
-
- @MBeanConstructor("Creates an MBean exposing an AMQQueue.")
- public AMQQueueMBean() throws NotCompliantMBeanException
+ @MBeanConstructor("Creates an MBean exposing an AMQQueue")
+ public AMQQueueMBean() throws JMException
{
super(ManagedQueue.class, ManagedQueue.TYPE);
init();
}
- private void init()
+ /**
+ * initialises the openmbean data types
+ */
+ private void init() throws OpenDataException
{
_queueName = jmxEncode(new StringBuffer(_name), 0).toString();
- try
- {
- _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
- _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
- _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("MessageContent",
- "AMQ Message Content",
- _msgContentAttributes,
- _msgContentDescriptions,
- _msgContentAttributeTypes);
-
-
- _msgAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
- _msgAttributeTypes[2] = SimpleType.LONG; // For size
- _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
-
- _messageDataType = new CompositeType("Message",
- "AMQ Message",
- _msgAttributeNames,
- _msgAttributeDescriptions,
- _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages",
- "List of messages",
- _messageDataType,
- _msgAttributeIndex);
- }
- catch (OpenDataException ex)
- {
- _logger.error("OpenDataTypes could not be created.", ex);
- throw new RuntimeException(ex);
- }
+ _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+ _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+ _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+ _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
+ _msgContentAttributes, _msgContentAttributeTypes);
+
+ _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+ _msgAttributeTypes[2] = SimpleType.LONG; // For size
+ _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
+
+ _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
}
public String getObjectInstanceName()
@@ -234,12 +197,12 @@
public Long getMaximumMessageSize()
{
- return _maxAllowedMessageSize;
+ return _maxMessageSize;
}
public void setMaximumMessageSize(Long value)
{
- _maxAllowedMessageSize = value;
+ _maxMessageSize = value;
}
public Integer getConsumerCount()
@@ -259,27 +222,29 @@
public Integer getMaximumMessageCount()
{
- return _maxAllowedMessageCount;
+ return _maxMessageCount;
}
public void setMaximumMessageCount(Integer value)
{
- _maxAllowedMessageCount = value;
+ _maxMessageCount = value;
}
- public Long getQueueDepth()
+ public Long getMaximumQueueDepth()
{
- return _queueDepth;
+ return _maxQueueDepth;
}
// Sets the queue depth, the max queue size
- public void setQueueDepth(Long value)
+ public void setMaximumQueueDepth(Long value)
{
- _queueDepth = value;
+ _maxQueueDepth = value;
}
- // Returns the size of messages in the queue
- public Long getQueueSize() throws AMQException
+ /**
+ * returns the size of messages(KB) in the queue.
+ */
+ public Long getQueueDepth()
{
/* TODO: this must return a maintained count not
* iterate through all messages
@@ -291,17 +256,19 @@
return 0l;
}
- long queueSize = 0;
+ long queueDepth = 0;
for (AMQMessage message : list)
{
- queueSize = queueSize + getMessageSize(message);
+ queueDepth = queueDepth + getMessageSize(message);
}
- return new Long(Math.round(queueSize / 100));
+ return (long)Math.round(queueDepth / 1000);
*/
}
// Operations
- // calculates the size of an AMQMessage
+ /**
+ * returns size of message in bytes
+ */
private long getMessageSize(AMQMessage msg) throws AMQException
{
if (msg == null)
@@ -312,41 +279,40 @@
return msg.getContentHeaderBody().bodySize;
}
- // Checks if there is any notification to be send to the listeners
+ /**
+ * Checks if there is any notification to be send to the listeners
+ */
private void checkForNotification(AMQMessage msg) throws AMQException
{
- // Check for message count
+ // Check for threshold message count
Integer msgCount = getMessageCount();
if (msgCount >= getMaximumMessageCount())
{
- notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full.");
+ notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
}
- // Check for received message size
+ // Check for threshold message size
long messageSize = getMessageSize(msg);
- if (messageSize >= getMaximumMessageSize())
+ if (messageSize >= _maxMessageSize)
{
- notifyClients("MessageSize = " + messageSize + ", Message size (MessageID=" + msg.getMessageId() +
- ")is higher than the threshold value");
+ notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
}
- // Check for queue size in bytes
- long queueSize = getQueueSize();
- if (queueSize >= getQueueDepth())
+ // Check for threshold queue depth in bytes
+ long queueDepth = getQueueDepth();
+ if (queueDepth >= _maxQueueDepth)
{
- notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value");
+ notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
}
}
- // Send the notification to the listeners
+ /**
+ * Sends the notification to the listeners
+ */
private void notifyClients(String notificationMsg)
{
- Notification n = new Notification(
- MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
- this,
- ++_notificationSequenceNumber,
- System.currentTimeMillis(),
- notificationMsg);
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
@@ -375,10 +341,12 @@
}
}
- public CompositeData viewMessageContent(long msgId) throws JMException, AMQException
+ /**
+ * returns message content as byte array and related attributes for the given message id.
+ */
+ public CompositeData viewMessageContent(long msgId) throws JMException
{
List<AMQMessage> list = _deliveryMgr.getMessages();
- CompositeData messageContent = null;
AMQMessage msg = null;
for (AMQMessage message : list)
{
@@ -391,70 +359,65 @@
if (msg != null)
{
- // get message content
- Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
- List<Byte> msgContent = new ArrayList<Byte>();
- while (cBodies.hasNext())
+ try
{
- ContentBody body = cBodies.next();
- if (body.getSize() != 0)
+ // get message content
+ Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ while (cBodies.hasNext())
{
- ByteBuffer slice = body.payload.slice();
- for (int j = 0; j < slice.limit(); j++)
+ ContentBody body = cBodies.next();
+ if (body.getSize() != 0)
{
- msgContent.add(slice.get());
+ ByteBuffer slice = body.payload.slice();
+ for (int j = 0; j < slice.limit(); j++)
+ {
+ msgContent.add(slice.get());
+ }
}
}
- }
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = headerProperties.getContentType();
- String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+ String mimeType = headerProperties.getContentType();
+ String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
- messageContent = new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+ }
+ catch (AMQException e)
+ {
+ throw new JMException(e.getMessage());
+ }
}
else
{
- throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
+ throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
-
- return messageContent;
}
/**
- * Returns the messages stored in this queue in tabular form.
- *
- * @param beginIndex
- * @param endIndex
- * @return AMQ messages in tabular form.
- * @throws JMException
+ * Returns the header contents of the messages stored in this queue in tabular form.
*/
public TabularData viewMessages(int beginIndex, int endIndex) throws JMException, AMQException
{
if ((beginIndex > endIndex) || (beginIndex < 1))
{
- throw new JMException("FromIndex = " + beginIndex + ", ToIndex = " + endIndex +
- "\nFromIndex should be greater than 0 and less than ToIndex");
+ throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex +
+ "\nFrom Index should be greater than 0 and less than To Index");
}
List<AMQMessage> list = _deliveryMgr.getMessages();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
- if (beginIndex > list.size())
- {
- return _messageList;
- }
- endIndex = endIndex < list.size() ? endIndex : list.size();
-
- for (int i = beginIndex; i <= endIndex; i++)
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
{
AMQMessage msg = list.get(i - 1);
- long size = msg.getContentHeaderBody().bodySize;
-
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ long size = headerBody.bodySize;
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties;
List<String> headerAttribsList = new ArrayList<String>();
headerAttribsList.add("App Id=" + headerProperties.getAppId());
headerAttribsList.add("MimeType=" + headerProperties.getContentType());
@@ -462,13 +425,9 @@
headerAttribsList.add("Encoding=" + headerProperties.getEncoding());
headerAttribsList.add(headerProperties.toString());
- Object[] itemValues = {msg.getMessageId(),
- headerAttribsList.toArray(new String[0]),
- size, msg.isRedelivered()};
-
- CompositeData messageData = new CompositeDataSupport(_messageDataType,
- _msgAttributeNames,
- itemValues);
+ Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]),
+ headerBody.bodySize, msg.isRedelivered()};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
_messageList.put(messageData);
}
@@ -476,20 +435,15 @@
}
/**
- * Creates all the notifications this MBean can send.
- *
- * @return Notifications broadcasted by this MBean.
+ * returns Notifications sent by this MBean.
*/
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]
- {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
String name = MonitorNotification.class.getName();
- String description = "An attribute of this MBean has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
- name,
- description);
+ String description = "Either Message count or Queue depth or Message size has reached threshold high value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
return new MBeanNotificationInfo[]{info1};
}
@@ -591,9 +545,9 @@
{
return new AMQQueueMBean();
}
- catch (NotCompliantMBeanException ex)
+ catch (JMException ex)
{
- throw new AMQException("AMQQueue MBean creation has failed.", ex);
+ throw new AMQException("AMQQueue MBean creation has failed ", ex);
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java Mon Dec 18 01:10:59 2006
@@ -46,16 +46,48 @@
* @return the name of the managedQueue.
* @throws IOException
*/
- @MBeanAttribute(name="Name", description = "Name of the " + TYPE)
+ @MBeanAttribute(name="Name", description = TYPE + " Name")
String getName() throws IOException;
/**
- * Tells whether this ManagedQueue is durable or not.
- * @return true if this ManagedQueue is a durable queue.
+ * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
+ * @return number of undelivered message in the Queue.
* @throws IOException
*/
- @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
- boolean isDurable() throws IOException;
+ @MBeanAttribute(name="MessageCount", description = "Total number of undelivered messages on the queue")
+ Integer getMessageCount() throws IOException;
+
+ /**
+ * Tells the total number of messages receieved by the queue since startup.
+ * @return total number of messages received.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ReceivedMessageCount", description="The total number of messages receieved by the queue since startup")
+ Long getReceivedMessageCount() throws IOException;
+
+ /**
+ * Size of messages in the queue
+ * @return
+ * @throws IOException
+ */
+ @MBeanAttribute(name="QueueDepth", description="Size of messages(KB) in the queue")
+ Long getQueueDepth() throws IOException;
+
+ /**
+ * Returns the total number of active subscribers to the queue.
+ * @return the number of active subscribers
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
+ Integer getActiveConsumerCount() throws IOException;
+
+ /**
+ * Returns the total number of subscribers to the queue.
+ * @return the number of subscribers.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
+ Integer getConsumerCount() throws IOException;
/**
* Tells the Owner of the ManagedQueue.
@@ -66,21 +98,20 @@
String getOwner() throws IOException;
/**
- * Tells if the ManagedQueue is set to AutoDelete.
- * @return true if the ManagedQueue is set to AutoDelete.
+ * Tells whether this ManagedQueue is durable or not.
+ * @return true if this ManagedQueue is a durable queue.
* @throws IOException
*/
- @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
- boolean isAutoDelete() throws IOException;
+ @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
+ boolean isDurable() throws IOException;
/**
- * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
- * @return number of undelivered message in the Queue.
+ * Tells if the ManagedQueue is set to AutoDelete.
+ * @return true if the ManagedQueue is set to AutoDelete.
* @throws IOException
*/
- @MBeanAttribute(name="MessageCount",
- description = "Total number of undelivered messages on the queue")
- Integer getMessageCount() throws IOException;
+ @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
+ boolean isAutoDelete() throws IOException;
/**
* Returns the maximum size of a message (in kbytes) allowed to be accepted by the
@@ -99,36 +130,10 @@
* @param size maximum size of message.
* @throws IOException
*/
- @MBeanAttribute(name="MaximumMessageSize",
- description="Maximum size(KB) of a message allowed for this Queue")
+ @MBeanAttribute(name="MaximumMessageSize", description="Threshold high value(KB) for a message size")
void setMaximumMessageSize(Long size) throws IOException;
/**
- * Returns the total number of subscribers to the queue.
- * @return the number of subscribers.
- * @throws IOException
- */
- @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
- Integer getConsumerCount() throws IOException;
-
- /**
- * Returns the total number of active subscribers to the queue.
- * @return the number of active subscribers
- * @throws IOException
- */
- @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
- Integer getActiveConsumerCount() throws IOException;
-
- /**
- * Tells the total number of messages receieved by the queue since startup.
- * @return total number of messages received.
- * @throws IOException
- */
- @MBeanAttribute(name="ReceivedMessageCount",
- description="The total number of messages receieved by the queue since startup")
- Long getReceivedMessageCount() throws IOException;
-
- /**
* Tells the maximum number of messages that can be stored in the queue.
* This is useful in setting the notifications or taking required
* action is the number of message increase this limit.
@@ -142,27 +147,16 @@
* @param value the maximum number of messages allowed to be stored in the queue.
* @throws IOException
*/
- @MBeanAttribute(name="MaximumMessageCount",
- description="The maximum number of messages allowed to be stored in the queue")
+ @MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue")
void setMaximumMessageCount(Integer value) throws IOException;
/**
- * Size of messages in the queue
- * @return
- * @throws IOException
- */
- @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
- Long getQueueSize() throws IOException, AMQException;
-
- /**
- * Tells the maximum size of all the messages combined together,
- * that can be stored in the queue. This is useful for setting notifications
- * or taking required action if the size of messages stored in the queue
- * increases over this limit.
- * @return maximum size of the all the messages allowed for the queue.
+ * This is useful for setting notifications or taking required action if the size of messages
+ * stored in the queue increases over this limit.
+ * @return threshold high value for Queue Depth
* @throws IOException
*/
- Long getQueueDepth() throws IOException;
+ Long getMaximumQueueDepth() throws IOException;
/**
* Sets the maximum size of all the messages together, that can be stored
@@ -170,9 +164,8 @@
* @param value
* @throws IOException
*/
- @MBeanAttribute(name="QueueDepth",
- description="The size(KB) of all the messages together, that can be stored in the queue")
- void setQueueDepth(Long value) throws IOException;
+ @MBeanAttribute(name="MaximumQueueDepth", description="The threshold high value(KB) for Queue Depth")
+ void setMaximumQueueDepth(Long value) throws IOException;
@@ -189,19 +182,22 @@
* @throws JMException
*/
@MBeanOperation(name="viewMessages",
- description="shows messages in this queue with given indexes. eg. from index 1 - 100")
+ description="Message headers for messages in this queue within given index range. eg. from index 1 - 100")
TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex,
@MBeanOperationParameter(name="to index", description="to index")int toIndex)
throws IOException, JMException, AMQException;
+ @MBeanOperation(name="viewMessageContent", description="The message content for given Message Id")
+ CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
+ throws IOException, JMException;
+
/**
* Deletes the first message from top.
* @throws IOException
* @throws JMException
*/
- @MBeanOperation(name="deleteMessageFromTop",
- description="Deletes the first message from top",
- impact= MBeanOperationInfo.ACTION)
+ @MBeanOperation(name="deleteMessageFromTop", description="Deletes the first message from top",
+ impact= MBeanOperationInfo.ACTION)
void deleteMessageFromTop() throws IOException, JMException;
/**
@@ -210,12 +206,8 @@
* @throws JMException
*/
@MBeanOperation(name="clearQueue",
- description="Clears the queue by deleting all the undelivered messages from the queue",
- impact= MBeanOperationInfo.ACTION)
+ description="Clears the queue by deleting all the undelivered messages from the queue",
+ impact= MBeanOperationInfo.ACTION)
void clearQueue() throws IOException, JMException;
- @MBeanOperation(name="viewMessageContent",
- description="Returns the message content along with MimeType and Encoding")
- CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
- throws IOException, JMException, AMQException;
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java Mon Dec 18 01:10:59 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.FieldTableFactory;
import org.apache.mina.common.ByteBuffer;
import javax.security.sasl.SaslServer;
@@ -54,7 +55,7 @@
{
try
{
- final FieldTable ft = new FieldTable(ByteBuffer.wrap(response), response.length);
+ final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
String username = (String) ft.get("LOGIN");
// we do not care about the prompt but it throws if null
NameCallback nameCb = new NameCallback("prompt", username);
Modified: incubator/qpid/branches/new_persistence/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/pom.xml?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/pom.xml (original)
+++ incubator/qpid/branches/new_persistence/java/client/pom.xml Mon Dec 18 01:10:59 2006
@@ -38,7 +38,6 @@
<amqj.logging.level>warn</amqj.logging.level>
</properties>
-
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
@@ -49,7 +48,6 @@
<artifactId>qpid-broker</artifactId>
</dependency>
-
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
@@ -72,11 +70,11 @@
<artifactId>mina-filter-ssl</artifactId>
</dependency>
-
<dependency>
<groupId>jmscts</groupId>
<artifactId>jmscts</artifactId>
<version>0.5-b2</version>
+ <scope>test</scope>
<exclusions>
<exclusion>
<groupId>jms</groupId>
@@ -86,9 +84,12 @@
</dependency>
<dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.easymock</groupId>
<artifactId>easymockclassextension</artifactId>
- <scope>test</scope>
</dependency>
</dependencies>
@@ -106,7 +107,7 @@
</property>
<property>
<name>amqj.logging.level</name>
- <value>WARN</value>
+ <value>${amqj.logging.level}</value>
</property>
<property>
<name>log4j.configuration</name>
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Mon Dec 18 01:10:59 2006
@@ -56,7 +56,7 @@
{
//todo this list of valid transports should be enumerated somewhere
if ((!(transport.equalsIgnoreCase("vm") ||
- transport.equalsIgnoreCase("tcp"))))
+ transport.equalsIgnoreCase("tcp"))))
{
if (transport.equalsIgnoreCase("localhost"))
{
@@ -65,7 +65,7 @@
}
else
{
- if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' )
+ if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/')
{
//Then most likely we have a host:port value
connection = new URI(DEFAULT_TRANSPORT + "://" + url);
@@ -88,7 +88,7 @@
if (transport == null)
{
URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
- " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
+ " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
}
setTransport(transport);
@@ -107,12 +107,45 @@
if (port == -1)
{
- // Another fix for Java 1.5 URI handling
+ // Fix for when there is port data but it is not automatically parseable by getPort().
String auth = connection.getAuthority();
- if (auth != null && auth.startsWith(":"))
+ if (auth != null && auth.contains(":"))
{
- setPort(Integer.parseInt(auth.substring(1)));
+ int start = auth.indexOf(":") + 1;
+ int end = start;
+ boolean looking = true;
+ boolean found = false;
+ //Walk the authority looking for a port value.
+ while (looking)
+ {
+ try
+ {
+ end++;
+ Integer.parseInt(auth.substring(start, end));
+
+ if (end >= auth.length())
+ {
+ looking = false;
+ found = true;
+ }
+ }
+ catch (NumberFormatException nfe)
+ {
+ looking = false;
+ }
+
+ }
+ if (found)
+ {
+ setPort(Integer.parseInt(auth.substring(start, end)));
+ }
+ else
+ {
+ URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+ "Illegal character in port number", connection.toString());
+ }
+
}
else
{
@@ -134,7 +167,7 @@
{
if (uris instanceof URLSyntaxException)
{
- throw (URLSyntaxException) uris;
+ throw(URLSyntaxException) uris;
}
URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
@@ -245,9 +278,9 @@
BrokerDetails bd = (BrokerDetails) o;
return _host.equalsIgnoreCase(bd.getHost()) &&
- (_port == bd.getPort()) &&
- _transport.equalsIgnoreCase(bd.getTransport()) &&
- (useSSL() == bd.useSSL());
+ (_port == bd.getPort()) &&
+ _transport.equalsIgnoreCase(bd.getTransport()) &&
+ (useSSL() == bd.useSSL());
//todo do we need to compare all the options as well?
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Dec 18 01:10:59 2006
@@ -44,6 +44,7 @@
import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
+import javax.jms.IllegalStateException;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
@@ -92,7 +93,7 @@
/**
* Maps from session id (Integer) to AMQSession instance
*/
- private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
private String _clientName;
@@ -142,7 +143,8 @@
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
+ username + ":" + password + "@" +
+ (clientName==null?"":clientName) +
virtualHost + "?brokerlist='" + broker + "'"));
}
@@ -157,11 +159,13 @@
{
this(new AMQConnectionURL(useSSL ?
ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
+ username + ":" + password + "@" +
+ (clientName==null?"":clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='true'" :
ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
+ username + ":" + password + "@" +
+ (clientName==null?"":clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='false'"
));
@@ -537,7 +541,10 @@
public void setClientID(String clientID) throws JMSException
{
checkNotClosed();
- _clientName = clientID;
+ // in AMQP it is not possible to change the client ID. If one is not specified
+ // upon connection construction, an id is generated automatically. Therefore
+ // we can always throw an exception.
+ throw new IllegalStateException("Client name cannot be changed after being set");
}
public ConnectionMetaData getMetaData() throws JMSException
@@ -583,7 +590,6 @@
public void stop() throws JMSException
{
checkNotClosed();
-
if (_started)
{
for (Iterator i = _sessions.values().iterator(); i.hasNext();)
@@ -920,8 +926,8 @@
void deregisterSession(int channelId)
{
_sessions.remove(channelId);
- }
-
+ }
+
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Dec 18 01:10:59 2006
@@ -27,6 +27,7 @@
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
@@ -367,13 +368,24 @@
public StreamMessage createStreamMessage() throws JMSException
{
- checkNotClosed();
- throw new UnsupportedOperationException("Stream messages not supported");
+ synchronized (_connection.getFailoverMutex())
+ {
+ checkNotClosed();
+
+ try
+ {
+ return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException("Unable to create text message: " + e);
+ }
+ }
}
public TextMessage createTextMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
@@ -462,28 +474,30 @@
// that can be called from a different thread of control from the one controlling the session
synchronized(_connection.getFailoverMutex())
{
- _closed.set(true);
-
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
-
- try
+ //Ensure we only try and close an open session.
+ if (!_closed.getAndSet(true))
{
- _connection.getProtocolHandler().closeSession(this);
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(
- getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
- _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
- }
- catch (AMQException e)
- {
- throw new JMSException("Error closing session: " + e);
- }
- finally
- {
- _connection.deregisterSession(_channelId);
+ try
+ {
+ _connection.getProtocolHandler().closeSession(this);
+ final AMQFrame frame = ChannelCloseBody.createAMQFrame(
+ getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+ _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully
+
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException("Error closing session: " + e);
+ }
+ finally
+ {
+ _connection.deregisterSession(_channelId);
+ }
}
}
}
@@ -723,6 +737,7 @@
/**
* Creates a QueueReceiver
+ *
* @param destination
* @return QueueReceiver - a wrapper around our MessageConsumer
* @throws JMSException
@@ -736,6 +751,7 @@
/**
* Creates a QueueReceiver using a message selector
+ *
* @param destination
* @param messageSelector
* @return QueueReceiver - a wrapper around our MessageConsumer
@@ -826,7 +842,7 @@
final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
// TODO: construct the rawSelector from the selector string if rawSelector == null
- final FieldTable ft = new FieldTable();
+ final FieldTable ft = FieldTableFactory.newFieldTable();
//if (rawSelector != null)
// ft.put("headers", rawSelector.getDataAsBytes());
if (rawSelector != null)
@@ -935,6 +951,7 @@
public Queue createQueue(String queueName) throws JMSException
{
+ checkNotClosed();
if (queueName.indexOf('/') == -1)
{
return new AMQQueue(queueName);
@@ -957,12 +974,14 @@
/**
* Creates a QueueReceiver wrapping a MessageConsumer
+ *
* @param queue
* @return QueueReceiver
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -970,6 +989,7 @@
/**
* Creates a QueueReceiver wrapping a MessageConsumer using a message selector
+ *
* @param queue
* @param messageSelector
* @return QueueReceiver
@@ -977,6 +997,7 @@
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector);
@@ -985,11 +1006,15 @@
public QueueSender createSender(Queue queue) throws JMSException
{
- return (QueueSender) createProducer(queue);
+ checkNotClosed();
+ //return (QueueSender) createProducer(queue);
+ return new QueueSenderAdapter(createProducer(queue), queue);
}
public Topic createTopic(String topicName) throws JMSException
{
+ checkNotClosed();
+
if (topicName.indexOf('/') == -1)
{
return new AMQTopic(topicName);
@@ -1012,18 +1037,21 @@
/**
* Creates a non-durable subscriber
+ *
* @param topic
* @return TopicSubscriber - a wrapper round our MessageConsumer
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
+ checkNotClosed();
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
/**
* Creates a non-durable subscriber with a message selector
+ *
* @param topic
* @param messageSelector
* @param noLocal
@@ -1032,6 +1060,7 @@
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
+ checkNotClosed();
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
@@ -1045,6 +1074,7 @@
*/
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
+ checkNotClosed();
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1055,6 +1085,7 @@
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
+ checkNotClosed();
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
return new TopicSubscriberAdaptor(dest, consumer);
@@ -1062,26 +1093,32 @@
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- return (TopicPublisher) createProducer(topic);
+ checkNotClosed();
+ //return (TopicPublisher) createProducer(topic);
+ return new TopicPublisherAdapter(createProducer(topic), topic);
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
+ checkNotClosed();
throw new UnsupportedOperationException("Queue browsing not supported");
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
+ checkNotClosed();
throw new UnsupportedOperationException("Queue browsing not supported");
}
public TemporaryQueue createTemporaryQueue() throws JMSException
{
+ checkNotClosed();
return new AMQTemporaryQueue();
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
+ checkNotClosed();
return new AMQTemporaryTopic();
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Dec 18 01:10:59 2006
@@ -159,11 +159,13 @@
public String getMessageSelector() throws JMSException
{
+ checkPreConditions();
return _messageSelector;
}
public MessageListener getMessageListener() throws JMSException
{
+ checkPreConditions();
return (MessageListener) _messageListener.get();
}
@@ -179,7 +181,7 @@
public void setMessageListener(MessageListener messageListener) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
//if the current listener is non-null and the session is not stopped, then
//it is an error to call this method.
@@ -277,7 +279,7 @@
public Message receive(long l) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
acquireReceiving();
@@ -311,7 +313,7 @@
public Message receiveNoWait() throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
acquireReceiving();
@@ -520,7 +522,7 @@
*/
private void deregisterConsumer()
{
- _session.deregisterConsumer(_consumerTag);
+ _session.deregisterConsumer(_consumerTag);
}
public String getConsumerTag()
@@ -529,7 +531,20 @@
}
public void setConsumerTag(String consumerTag)
- {
+ {
_consumerTag = consumerTag;
}
+
+ public AMQSession getSession() {
+ return _session;
+ }
+
+ private void checkPreConditions() throws JMSException{
+
+ this.checkNotClosed();
+
+ if(_session == null || _session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Dec 18 01:10:59 2006
@@ -143,6 +143,7 @@
public void setDisableMessageID(boolean b) throws JMSException
{
+ checkPreConditions();
checkNotClosed();
// IGNORED
}
@@ -156,7 +157,7 @@
public void setDisableMessageTimestamp(boolean b) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
_disableTimestamps = b;
}
@@ -168,7 +169,7 @@
public void setDeliveryMode(int i) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
@@ -185,7 +186,7 @@
public void setPriority(int i) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
if (i < 0 || i > 9)
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
@@ -201,7 +202,7 @@
public void setTimeToLive(long l) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
if (l < 0)
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
@@ -229,6 +230,7 @@
public void send(Message message) throws JMSException
{
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
@@ -238,6 +240,7 @@
public void send(Message message, int deliveryMode) throws JMSException
{
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -247,6 +250,7 @@
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -257,6 +261,7 @@
public void send(Message message, int deliveryMode, int priority,
long timeToLive) throws JMSException
{
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
@@ -266,7 +271,7 @@
public void send(Destination destination, Message message) throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -279,7 +284,7 @@
int priority, long timeToLive)
throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -292,7 +297,7 @@
int priority, long timeToLive, boolean mandatory)
throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -305,7 +310,7 @@
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -319,7 +324,7 @@
boolean immediate, boolean waitUntilSent)
throws JMSException
{
- checkNotClosed();
+ checkPreConditions();
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -334,7 +339,7 @@
{
throw new JMSException("Unsupported destination class: " +
(destination != null ? destination.getClass() : null));
- }
+ }
declareDestination((AMQDestination)destination);
}
@@ -481,4 +486,20 @@
checkNotClosed();
_encoding = encoding;
}
+
+ private void checkPreConditions() throws IllegalStateException, JMSException {
+ checkNotClosed();
+
+ if(_destination == null){
+ throw new UnsupportedOperationException("Destination is null");
+ }
+
+ if(_session == null || _session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
+
+ public AMQSession getSession() {
+ return _session;
+ }
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java Mon Dec 18 01:10:59 2006
@@ -39,31 +39,37 @@
public String getMessageSelector() throws JMSException
{
+ checkPreConditions();
return _consumer.getMessageSelector();
}
public MessageListener getMessageListener() throws JMSException
{
+ checkPreConditions();
return _consumer.getMessageListener();
}
public void setMessageListener(MessageListener messageListener) throws JMSException
{
+ checkPreConditions();
_consumer.setMessageListener(messageListener);
}
public Message receive() throws JMSException
{
+ checkPreConditions();
return _consumer.receive();
}
public Message receive(long l) throws JMSException
{
+ checkPreConditions();
return _consumer.receive(l);
}
public Message receiveNoWait() throws JMSException
{
+ checkPreConditions();
return _consumer.receiveNoWait();
}
@@ -79,8 +85,26 @@
*/
public Queue getQueue() throws JMSException
{
+ checkPreConditions();
return _queue;
}
+ private void checkPreConditions() throws javax.jms.IllegalStateException {
+ BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+
+ if (msgConsumer.isClosed() ){
+ throw new javax.jms.IllegalStateException("Consumer is closed");
+ }
+
+ if(_queue == null){
+ throw new UnsupportedOperationException("Queue is null");
+ }
+
+ AMQSession session = msgConsumer.getSession();
+
+ if(session == null || session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Mon Dec 18 01:10:59 2006
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -43,37 +44,45 @@
_consumer = consumer;
_noLocal = noLocal;
}
+
TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
{
this(topic, consumer, consumer.isNoLocal());
}
+
public Topic getTopic() throws JMSException
{
+ checkPreConditions();
return _topic;
}
public boolean getNoLocal() throws JMSException
{
+ checkPreConditions();
return _noLocal;
}
public String getMessageSelector() throws JMSException
{
+ checkPreConditions();
return _consumer.getMessageSelector();
}
public MessageListener getMessageListener() throws JMSException
{
+ checkPreConditions();
return _consumer.getMessageListener();
}
public void setMessageListener(MessageListener messageListener) throws JMSException
{
+ checkPreConditions();
_consumer.setMessageListener(messageListener);
}
public Message receive() throws JMSException
{
+ checkPreConditions();
return _consumer.receive();
}
@@ -84,6 +93,7 @@
public Message receiveNoWait() throws JMSException
{
+ checkPreConditions();
return _consumer.receiveNoWait();
}
@@ -91,4 +101,22 @@
{
_consumer.close();
}
+
+ private void checkPreConditions() throws javax.jms.IllegalStateException{
+ BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+
+ if (msgConsumer.isClosed() ){
+ throw new javax.jms.IllegalStateException("Consumer is closed");
+ }
+
+ if(_topic == null){
+ throw new UnsupportedOperationException("Topic is null");
+ }
+
+ AMQSession session = msgConsumer.getSession();
+
+ if(session == null || session.isClosed()){
+ throw new UnsupportedOperationException("Invalid Session");
+ }
+ }
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Mon Dec 18 01:10:59 2006
@@ -32,6 +32,7 @@
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -117,7 +118,7 @@
}
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- FieldTable clientProperties = new FieldTable();
+ FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.put("instance", ps.getClientID());
clientProperties.put("product", "Qpid");
clientProperties.put("version", "1.0");
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Mon Dec 18 01:10:59 2006
@@ -27,6 +27,7 @@
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
+import javax.jms.MessageEOFException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;