You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/16 13:13:28 UTC
svn commit: r496666 [2/3] - in /incubator/qpid/branches/perftesting/qpid:
java/broker/ java/broker/src/main/grammar/
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/filter/
java/broker/src/main/java/or...
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Jan 16 04:13:19 2007
@@ -23,12 +23,21 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.util.Queue;
+
/**
* Encapsulation of a supscription to a queue.
* <p/>
@@ -48,23 +57,30 @@
private final Object sessionKey;
+ private Queue<AMQMessage> _messages;
+
+ private final boolean _noLocal;
+
/**
* True if messages need to be acknowledged
*/
private final boolean _acks;
+ private FilterManager _filters;
+ private final boolean _isBrowser;
+ private final Boolean _autoClose;
+ private boolean _closed = false;
public static class Factory implements SubscriptionFactory
{
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
}
}
@@ -72,6 +88,13 @@
String consumerTag, boolean acks)
throws AMQException
{
+ this(channelId, protocolSession, consumerTag, acks, null, false);
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ throws AMQException
+ {
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
@@ -83,8 +106,61 @@
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _noLocal = noLocal;
+
+ _filters = FilterManagerFactory.createManager(filters);
+
+
+ if (_filters != null)
+ {
+ Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+ if (isBrowser != null)
+ {
+ _isBrowser = (Boolean) isBrowser;
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+
+
+ if (_filters != null)
+ {
+ Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+
+
+ if (_filters != null)
+ {
+ _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+
+ }
+ else
+ {
+ // Reference the DeliveryManager
+ _messages = null;
+ }
}
+
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
String consumerTag)
throws AMQException
@@ -129,9 +205,50 @@
{
if (msg != null)
{
+ if (_isBrowser)
+ {
+ sendToBrowser(msg, queue);
+ }
+ else
+ {
+ sendToConsumer(msg, queue);
+ }
+ }
+ else
+ {
+ _logger.error("Attempt to send Null message", new NullPointerException());
+ }
+ }
+
+ private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
+
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
+ // received the message. If it is lost in transit that is not important.
+ if (_acks)
+ {
+ channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+ protocolSession.writeFrame(frame);
+ }
+ }
+
+ private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ try
+ {
// if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
+ // we can decrement the reference count immediately.
+
// By doing this _before_ the send we ensure that it
// doesn't get sent if it can't be dequeued, preventing
// duplicate delivery on recovery.
@@ -157,9 +274,9 @@
protocolSession.writeFrame(frame);
}
}
- else
+ finally
{
- _logger.error("Attempt to send Null message", new NullPointerException());
+ msg.setDeliveredToConsumer();
}
}
@@ -177,6 +294,101 @@
{
channel.queueDeleted(queue);
}
+
+ public boolean hasFilters()
+ {
+ return _filters != null;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ if (_noLocal)
+ {
+ // We don't want local messages so check to see if message is one we sent
+ if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+ msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
+ else // if not then filter the message.
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
+ ") but not ours so filtering");
+ }
+ return checkFilters(msg);
+ }
+ }
+ else
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
+ }
+ return checkFilters(msg);
+ }
+ }
+
+ private boolean checkFilters(AMQMessage msg)
+ {
+ if (_filters != null)
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
+ }
+ return _filters.allAllow(msg);
+ }
+ else
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+ }
+
+ return true;
+ }
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return _messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ if (_messages != null)
+ {
+ _messages.offer(msg);
+ }
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public void close()
+ {
+ if (!_closed)
+ {
+ _logger.info("Closing autoclose subscription:" + this);
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ _closed = true;
+ }
+ }
+
+ public boolean isBrowser()
+ {
+ return _isBrowser;
+ }
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Jan 16 04:13:19 2007
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.List;
+
/**
* Abstraction of actor that will determine the subscriber to whom
* a message will be sent.
*/
public interface SubscriptionManager
{
+ public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
public Subscription nextSubscriber(AMQMessage msg);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Jan 16 04:13:19 2007
@@ -21,6 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@
/**
* Remove the subscription, returning it if it was found
+ *
* @param subscription
* @return null if no match was found
*/
@@ -90,7 +93,7 @@
/**
* Return the next unsuspended subscription or null if not found.
- *
+ * <p/>
* Performance note:
* This method can scan all items twice when looking for a subscription that is not
* suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,31 +108,51 @@
return null;
}
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
+ try
+ {
+ final Subscription result = nextSubscriberImpl(msg);
+ if (result == null)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
- } else {
+ return nextSubscriberImpl(msg);
+ }
+ else
+ {
return result;
}
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
+ return nextSubscriber(msg);
}
}
- private Subscription nextSubscriber()
+ private Subscription nextSubscriberImpl(AMQMessage msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
+ while (iterator.hasNext())
+ {
Subscription subscription = iterator.next();
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
+
+ if (!subscription.isSuspended())
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
}
}
+
return null;
}
@@ -145,11 +168,19 @@
return _subscriptions.isEmpty();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return _subscriptions;
+ }
+
public boolean hasActiveSubscribers()
{
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) return true;
+ if (!s.isSuspended())
+ {
+ return true;
+ }
}
return false;
}
@@ -159,7 +190,10 @@
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) count++;
+ if (!s.isSuspended())
+ {
+ count++;
+ }
}
return count;
}
@@ -167,6 +201,7 @@
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
public void queueDeleted(AMQQueue queue)
@@ -177,7 +212,8 @@
}
}
- int size() {
+ int size()
+ {
return _subscriptions.size();
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Tue Jan 16 04:13:19 2007
@@ -35,7 +35,7 @@
*/
class SynchronizedDeliveryManager implements DeliveryManager
{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+ private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
/**
* Holds any queued messages
@@ -122,6 +122,11 @@
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -243,7 +248,6 @@
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java Tue Jan 16 04:13:19 2007
@@ -62,7 +62,7 @@
}
// we do not currently support authcid in any meaningful way
- String authcid = new String(response, 0, authzidNullPosition, "utf8");
+ // String authcid = new String(response, 0, authzidNullPosition, "utf8");
String authzid = new String(response, authzidNullPosition + 1, authcidNullPosition - 1, "utf8");
// we do not care about the prompt but it throws if null
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java Tue Jan 16 04:13:19 2007
@@ -20,10 +20,15 @@
*/
package org.apache.qpid.server.util;
+import org.apache.log4j.Logger;
+
import java.util.Iterator;
public class CircularBuffer implements Iterable
{
+
+ private static final Logger _logger = Logger.getLogger(CircularBuffer.class);
+
private final Object[] _log;
private int _size;
private int _index;
@@ -102,7 +107,7 @@
{
for(Object o : this)
{
- System.out.println(o);
+ _logger.info(o);
}
}
@@ -120,7 +125,7 @@
for(String s : items)
{
buffer.add(s);
- System.out.println(buffer);
+ _logger.info(buffer);
}
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Jan 16 04:13:19 2007
@@ -139,6 +139,12 @@
*/
private AMQException _lastAMQException = null;
+
+ /*
+ * The connection meta data
+ */
+ private QpidConnectionMetaData _connectionMetaData;
+
public AMQConnection(String broker, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
@@ -281,6 +287,7 @@
throw e;
}
+ _connectionMetaData = new QpidConnectionMetaData(this);
}
protected boolean checkException(Throwable thrown)
@@ -550,7 +557,7 @@
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
- return QpidConnectionMetaData.instance();
+ return _connectionMetaData;
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 16 04:13:19 2007
@@ -23,13 +23,15 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -69,15 +71,15 @@
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled
- * correctly. Note this only keeps a record of subscriptions which have been created
- * in the current instance. It does not remember subscriptions between executions of the
- * client
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled
+ * correctly. Note this only keeps a record of subscriptions which have been created
+ * in the current instance. It does not remember subscriptions between executions of the
+ * client
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -143,6 +145,7 @@
private boolean _inRecovery;
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -176,7 +179,7 @@
{
if (message.deliverBody != null)
{
- final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
if (consumer == null)
{
@@ -210,17 +213,15 @@
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
}
+
}
catch (Exception e)
{
@@ -318,7 +319,7 @@
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -334,7 +335,7 @@
public MapMessage createMapMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -350,7 +351,7 @@
public javax.jms.Message createMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -366,7 +367,7 @@
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -382,7 +383,7 @@
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -400,7 +401,7 @@
public StreamMessage createStreamMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -417,7 +418,7 @@
public TextMessage createTextMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -434,7 +435,7 @@
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -504,7 +505,7 @@
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -569,7 +570,7 @@
*/
public void closed(Throwable e)
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -721,11 +722,11 @@
public void acknowledge() throws JMSException
{
- if(isClosed())
+ if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
- for(BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.acknowledge();
}
@@ -734,7 +735,6 @@
}
-
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
@@ -843,7 +843,9 @@
false,
false,
null,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -855,7 +857,9 @@
false,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -868,7 +872,26 @@
noLocal,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
+ }
+
+ public MessageConsumer createBrowserConsumer(Destination destination,
+ String messageSelector,
+ boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ noLocal,
+ false,
+ messageSelector,
+ null,
+ true,
+ true);
}
public MessageConsumer createConsumer(Destination destination,
@@ -878,7 +901,7 @@
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
}
@@ -890,7 +913,7 @@
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -902,7 +925,7 @@
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -915,7 +938,7 @@
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
protected MessageConsumer createConsumerImpl(final Destination destination,
@@ -924,7 +947,9 @@
final boolean noLocal,
final boolean exclusive,
final String selector,
- final FieldTable rawSelector) throws JMSException
+ final FieldTable rawSelector,
+ final boolean noConsume,
+ final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
@@ -948,12 +973,18 @@
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
- _acknowledgeMode);
+ _acknowledgeMode, noConsume, autoClose);
try
{
registerConsumer(consumer, false);
}
+ catch (AMQInvalidSelectorException ise)
+ {
+ JMSException ex = new InvalidSelectorException(ise.getMessage());
+ ex.setLinkedException(ise);
+ throw ex;
+ }
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -963,7 +994,7 @@
synchronized(destination)
{
- _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+ _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
_destinationConsumerCount.get(destination).incrementAndGet();
}
@@ -975,16 +1006,16 @@
private void checkTemporaryDestination(Destination destination)
throws JMSException
{
- if((destination instanceof TemporaryDestination))
+ if ((destination instanceof TemporaryDestination))
{
_logger.debug("destination is temporary");
final TemporaryDestination tempDest = (TemporaryDestination) destination;
- if(tempDest.getSession() != this)
+ if (tempDest.getSession() != this)
{
_logger.debug("destination is on different session");
throw new JMSException("Cannot consume from a temporary destination created onanother session");
}
- if(tempDest.isDeleted())
+ if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot consume from a deleted destination");
@@ -1065,12 +1096,26 @@
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
- boolean nowait) throws AMQException
+ boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
+ FieldTable arguments = FieldTableFactory.newFieldTable();
+ if (messageSelector != null && !messageSelector.equals(""))
+ {
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+ }
+ if(consumer.isAutoClose())
+ {
+ arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+ if(consumer.isNoConsume())
+ {
+ arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
+
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
@@ -1080,7 +1125,7 @@
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
queueName, tag, consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait);
+ consumer.isExclusive(), nowait, arguments);
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1220,7 +1265,7 @@
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
@@ -1247,8 +1292,8 @@
subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1278,8 +1323,8 @@
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1291,16 +1336,14 @@
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1476,7 +1519,14 @@
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+ try
+ {
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+ }
+ catch (JMSException e) //thrown by getMessageSelector
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
}
/**
@@ -1489,7 +1539,7 @@
{
_consumers.remove(consumer.getConsumerTag());
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if(subscriptionName != null)
+ if (subscriptionName != null)
{
_subscriptions.remove(subscriptionName);
}
@@ -1497,7 +1547,7 @@
Destination dest = consumer.getDestination();
synchronized(dest)
{
- if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
{
_destinationConsumerCount.remove(dest);
}
@@ -1567,6 +1617,16 @@
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
+ public void confirmConsumerCancelled(String consumerTag)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ if((consumer != null) && (consumer.isAutoClose()))
+ {
+ consumer.closeWhenNoMessages(true);
+ }
+ }
+
+
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
@@ -1576,7 +1636,7 @@
{
throw new javax.jms.InvalidDestinationException("Invalid Topic");
}
- if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+ if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
{
throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
}
@@ -1597,4 +1657,5 @@
throw new javax.jms.InvalidDestinationException("Invalid Queue");
}
}
+
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jan 16 04:13:19 2007
@@ -22,6 +22,8 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
@@ -39,6 +41,7 @@
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.jms.Destination;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -142,10 +145,19 @@
*/
private Thread _receivingThread;
+ /**
+ * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+ * on the queue. This is used for queue browsing.
+ */
+ private boolean _autoClose;
+ private boolean _closeWhenNoMessages;
+
+ private boolean _noConsume;
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
- boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
- int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
+ boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+ int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -161,6 +173,8 @@
_exclusive = exclusive;
_acknowledgeMode = acknowledgeMode;
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
+ _autoClose = autoClose;
+ _noConsume = noConsume;
}
public AMQDestination getDestination()
@@ -241,6 +255,17 @@
if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
+ String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString());
+ try
+ {
+ Destination dest = AMQDestination.createDestination(new AMQBindingURL(url));
+ jmsMsg.setJMSDestination(dest);
+ }
+ catch (URLSyntaxException e)
+ {
+ _logger.warn("Unable to parse the supplied destination header: " + url);
+ }
+
}
_session.setInRecovery(false);
}
@@ -307,6 +332,10 @@
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = null;
if (l > 0)
{
@@ -336,6 +365,19 @@
}
}
+ private boolean closeOnAutoClose() throws JMSException
+ {
+ if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+ {
+ close(false);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -344,6 +386,10 @@
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -388,22 +434,31 @@
}
}
+
public void close() throws JMSException
{
+ close(true);
+ }
+
+ public void close(boolean sendClose) throws JMSException
+ {
synchronized(_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
-
- try
+ if(sendClose)
{
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
- }
- catch (AMQException e)
- {
- _logger.error("Error closing consumer: " + e, e);
- throw new JMSException("Error closing consumer: " + e);
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+ try
+ {
+ _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error closing consumer: " + e, e);
+ throw new JMSException("Error closing consumer: " + e);
+ }
}
deregisterConsumer();
@@ -499,6 +554,12 @@
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ break;
case Session.DUPS_OK_ACKNOWLEDGE:
if (++_outstanding >= _prefetchHigh)
{
@@ -525,7 +586,14 @@
}
break;
case Session.SESSION_TRANSACTED:
- _lastDeliveryTag = msg.getDeliveryTag();
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _lastDeliveryTag = msg.getDeliveryTag();
+ }
break;
}
}
@@ -615,5 +683,30 @@
public void clearUnackedMessages()
{
_unacknowledgedDeliveryTags.clear();
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+
+ public boolean isNoConsume()
+ {
+ return _noConsume;
+ }
+
+ public void closeWhenNoMessages(boolean b)
+ {
+ _closeWhenNoMessages = b;
+
+ if(_closeWhenNoMessages
+ && _synchronousQueue.isEmpty()
+ && _receiving.get()
+ && _messageListener != null)
+ {
+ _receivingThread.interrupt();
+ }
+
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Jan 16 04:13:19 2007
@@ -507,8 +507,11 @@
long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
checkTemporaryDestination(destination);
+ origMessage.setJMSDestination(destination);
+
AbstractJMSMessage message = convertToNativeMessage(origMessage);
+ message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
destination.getRoutingKey(), mandatory, immediate);
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Tue Jan 16 04:13:19 2007
@@ -1,50 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.client;
+import org.apache.qpid.common.QpidProperties;
+
import java.util.Enumeration;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
-public class QpidConnectionMetaData implements ConnectionMetaData {
+public class QpidConnectionMetaData implements ConnectionMetaData
+{
+
+
+ QpidConnectionMetaData(AMQConnection conn)
+ {
+ }
+
+ public int getJMSMajorVersion() throws JMSException
+ {
+ return 1;
+ }
+
+ public int getJMSMinorVersion() throws JMSException
+ {
+ return 1;
+ }
+
+ public String getJMSProviderName() throws JMSException
+ {
+ return "Apache " + QpidProperties.getProductName();
+ }
+
+ public String getJMSVersion() throws JMSException
+ {
+ return "1.1";
+ }
+
+ public Enumeration getJMSXPropertyNames() throws JMSException
+ {
+ return CustomJMXProperty.asEnumeration();
+ }
+
+ public int getProviderMajorVersion() throws JMSException
+ {
+ return 0;
+ }
+
+ public int getProviderMinorVersion() throws JMSException
+ {
+ return 8;
+ }
+
+ public String getProviderVersion() throws JMSException
+ {
+ return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+ + getProtocolVersion() + "] )";
+ }
+
+ private String getProtocolVersion()
+ {
+ // TODO - Implement based on connection negotiated protocol
+ return "0.8";
+ }
+
+ public String getBrokerVersion()
+ {
+ // TODO - get broker version
+ return "<unkown>";
+ }
+
+ public String getClientVersion()
+ {
+ return QpidProperties.getBuildVerision();
+ }
+
- private static QpidConnectionMetaData _instance = new QpidConnectionMetaData();
-
- private QpidConnectionMetaData(){
- }
-
- public static QpidConnectionMetaData instance(){
- return _instance;
- }
-
- public int getJMSMajorVersion() throws JMSException {
- return 1;
- }
-
- public int getJMSMinorVersion() throws JMSException {
- return 1;
- }
-
- public String getJMSProviderName() throws JMSException {
- return "Apache Qpid";
- }
-
- public String getJMSVersion() throws JMSException {
- return "1.1";
- }
-
- public Enumeration getJMSXPropertyNames() throws JMSException {
- return null;
- }
-
- public int getProviderMajorVersion() throws JMSException {
- return 0;
- }
-
- public int getProviderMinorVersion() throws JMSException {
- return 9;
- }
-
- public String getProviderVersion() throws JMSException {
- return "Incubating-M1";
- }
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Jan 16 04:13:19 2007
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.protocol.AMQConstant;
@@ -46,7 +47,7 @@
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
- _logger.debug("ChannelClose method received");
+ _logger.debug("ChannelClose method received");
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
int errorCode = method.replyCode;
@@ -65,17 +66,21 @@
{
throw new AMQNoConsumersException("Error: " + reason, null);
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+ {
+ _logger.info("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(reason);
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- throw new AMQNoRouteException("Error: " + reason, null);
- }
- else
- {
- throw new AMQChannelClosedException(errorCode, "Error: " + reason);
- }
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
+
}
evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Tue Jan 16 04:13:19 2007
@@ -20,20 +20,14 @@
*/
package org.apache.qpid.client.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
{
-
- private static final Logger _logger = Logger.getLogger(ConnectionOpenOkMethodHandler.class);
-
private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler();
public static ConnectionOpenOkMethodHandler getInstance()
@@ -47,8 +41,6 @@
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
- AMQProtocolSession session = evt.getProtocolSession();
- ConnectionOpenOkBody method = (ConnectionOpenOkBody) evt.getMethod();
stateManager.changeState(AMQState.CONNECTION_OPEN);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Jan 16 04:13:19 2007
@@ -22,6 +22,8 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -119,10 +121,11 @@
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put("instance", ps.getClientID());
- clientProperties.put("product", "Qpid");
- clientProperties.put("version", "1.0");
- clientProperties.put("platform", getFullSystemInfo());
+
+ clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+ clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
+ clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVerision());
+ clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
saslResponse, selectedLocale));
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Jan 16 04:13:19 2007
@@ -26,7 +26,8 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -46,7 +47,8 @@
protected ByteBuffer _data;
private boolean _readableProperties = false;
- private boolean _readableMessage = false;
+ protected boolean _readableMessage = false;
+ protected boolean _changedData;
private Destination _destination;
private BasicMessageConsumer _consumer;
@@ -60,6 +62,7 @@
}
_readableProperties = false;
_readableMessage = (data != null);
+ _changedData = (data == null);
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
@@ -172,13 +175,12 @@
public Destination getJMSDestination() throws JMSException
{
- // TODO: implement this once we have sorted out how to figure out the exchange class
- return _destination;
+ return _destination;
}
public void setJMSDestination(Destination destination) throws JMSException
{
- _destination = destination;
+ _destination = destination;
}
public int getJMSDeliveryMode() throws JMSException
@@ -522,16 +524,16 @@
return !_readableMessage;
}
- public void reset()
+ public void reset()
{
- if (_readableMessage)
+ if (!_changedData)
{
_data.rewind();
}
else
{
_data.flip();
- _readableMessage = true;
+ _changedData = false;
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Tue Jan 16 04:13:19 2007
@@ -59,6 +59,12 @@
super(messageNbr, contentHeader, data);
}
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
public String getMimeType()
{
return MIME_TYPE;
@@ -226,48 +232,56 @@
public void writeBoolean(boolean b) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.put(b ? (byte) 1 : (byte) 0);
}
public void writeByte(byte b) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.put(b);
}
public void writeShort(short i) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putShort(i);
}
public void writeChar(char c) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putChar(c);
}
public void writeInt(int i) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putInt(i);
}
public void writeLong(long l) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putLong(l);
}
public void writeFloat(float v) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putFloat(v);
}
public void writeDouble(double v) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putDouble(v);
}
@@ -281,7 +295,7 @@
_data.putShort((short)encodedString.limit());
_data.put(encodedString);
-
+ _changedData = true;
//_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must add the null terminator manually
//_data.put((byte)0);
@@ -298,12 +312,14 @@
{
checkWritable();
_data.put(bytes);
+ _changedData = true;
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
checkWritable();
_data.put(bytes, offset, length);
+ _changedData = true;
}
public void writeObject(Object object) throws JMSException
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Tue Jan 16 04:13:19 2007
@@ -112,7 +112,7 @@
}
}
-
+
public Serializable getObject() throws JMSException
{
ObjectInputStream in = null;
@@ -123,18 +123,18 @@
try
{
- _data.rewind();
+ _data.rewind();
in = new ObjectInputStream(_data.asInputStream());
return (Serializable) in.readObject();
}
catch (IOException e)
- {
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ {
+ e.printStackTrace();
+ throw new MessageFormatException("Could not deserialize message: " + e);
}
catch (ClassNotFoundException e)
{
- e.printStackTrace();
+ e.printStackTrace();
throw new MessageFormatException("Could not deserialize message: " + e);
}
finally
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Tue Jan 16 04:13:19 2007
@@ -86,6 +86,12 @@
super(messageNbr, contentHeader, data);
}
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
public String getMimeType()
{
return MIME_TYPE;
@@ -103,6 +109,7 @@
{
checkWritable();
_data.put(type);
+ _changedData = true;
}
public boolean readBoolean() throws JMSException
@@ -693,7 +700,7 @@
{
_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must write the null terminator ourselves
- _data.put((byte)0);
+ _data.put((byte) 0);
}
catch (CharacterCodingException e)
{
@@ -706,7 +713,7 @@
public void writeBytes(byte[] bytes) throws JMSException
{
- writeBytes(bytes, 0, bytes == null?0:bytes.length);
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Tue Jan 16 04:13:19 2007
@@ -117,6 +117,7 @@
{
_data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
}
+ _changedData=true;
}
_decodedValue = text;
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Jan 16 04:13:19 2007
@@ -406,4 +406,12 @@
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
+
+ public void confirmConsumerCancelled(int channelId, String consumerTag)
+ {
+ final Integer chId = channelId;
+ final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+
+ session.confirmConsumerCancelled(consumerTag);
+ }
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Jan 16 04:13:19 2007
@@ -110,7 +110,7 @@
}
else
{
- throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+ throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Tue Jan 16 04:13:19 2007
@@ -103,6 +103,7 @@
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+ frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Tue Jan 16 04:13:19 2007
@@ -59,7 +59,7 @@
// once more testing of the performance of the simple allocator has been done
if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
{
- _logger.warn("Using SimpleByteBufferAllocator");
+ _logger.info("Using SimpleByteBufferAllocator");
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Tue Jan 16 04:13:19 2007
@@ -127,7 +127,7 @@
_logger.info("Starting connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive();
- TextMessage tm2 = (TextMessage) consumer.receive();
+ consumer.receive();
tm.acknowledge();
_logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java Tue Jan 16 04:13:19 2007
@@ -31,7 +31,7 @@
public class MultipleConnectionTest extends TestCase
{
public static final String _defaultBroker = "vm://:1";
- public static String _connectionString = _defaultBroker;
+ public String _connectionString = _defaultBroker;
private static class Receiver
{
@@ -175,9 +175,6 @@
public static void main(String[] argv) throws Exception
{
String broker = argv.length > 0 ? argv[0] : _defaultBroker;
-
- int connections = 7;
- int sessions = 2;
MultipleConnectionTest test = new MultipleConnectionTest();
test._connectionString = broker;
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Jan 16 04:13:19 2007
@@ -116,7 +116,9 @@
m.setIntProperty("Int", (int) Integer.MAX_VALUE);
m.setJMSCorrelationID("Correlation");
- m.setJMSPriority(100);
+ //fixme the m.setJMSMessage has no effect
+ producer.setPriority(8);
+ m.setJMSPriority(3);
// Queue
Queue q;
@@ -182,10 +184,8 @@
(int) Integer.MAX_VALUE, m.getIntProperty("Int"));
Assert.assertEquals("Check CorrelationID properties are correctly transported",
"Correlation", m.getJMSCorrelationID());
-
- _logger.warn("getJMSPriority not being verified.");
-// Assert.assertEquals("Check Priority properties are correctly transported",
-// 100, m.getJMSPriority());
+ Assert.assertEquals("Check Priority properties are correctly transported",
+ 8, m.getJMSPriority());
// Queue
Assert.assertEquals("Check ReplyTo properties are correctly transported",
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Tue Jan 16 04:13:19 2007
@@ -121,7 +121,7 @@
{
if (_connection != null)
{
- System.out.println(">>>>>>>>>>>>>>.. closing");
+ _log.info(">>>>>>>>>>>>>>.. closing");
_connection.close();
}
}
@@ -137,7 +137,7 @@
{
public void onException(JMSException jmsException)
{
- _log.error("onException - ", jmsException);
+ _log.warn("onException - "+jmsException.getMessage());
}
});
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java Tue Jan 16 04:13:19 2007
@@ -25,7 +25,6 @@
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageConsumer;
-import javax.jms.Message;
/**
* @author Apache Software Foundation
@@ -72,7 +71,7 @@
};
long startTime = System.currentTimeMillis();
new Thread(r).start();
- Message m = consumer.receive(10000);
+ consumer.receive(10000);
assertTrue(System.currentTimeMillis() - startTime < 10000);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Tue Jan 16 04:13:19 2007
@@ -54,8 +54,7 @@
{
try
{
- Connection connection = new AMQConnection(_broker, "guest", "guest",
- "fred", "/test");
+ new AMQConnection(_broker, "guest", "guest", "fred", "/test");
}
catch (Exception e)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java Tue Jan 16 04:13:19 2007
@@ -108,7 +108,7 @@
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.setString("value", null);
- char c = mm.getChar("value");
+ mm.getChar("value");
fail("Expected NullPointerException");
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Tue Jan 16 04:13:19 2007
@@ -89,10 +89,10 @@
StreamMessage msg2 = (StreamMessage) consumer.receive();
- byte b1 = msg2.readByte();
+ msg2.readByte();
try
{
- byte b2 = msg2.readByte();
+ msg2.readByte();
}
catch (Exception e)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Tue Jan 16 04:13:19 2007
@@ -260,7 +260,7 @@
TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
- MessageConsumer consumer2 = session2.createConsumer(topic);
+ session2.createConsumer(topic);
fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
}
catch (JMSException je)
Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java Tue Jan 16 04:13:19 2007
@@ -23,6 +23,8 @@
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -80,6 +82,11 @@
public int getWeight()
{
return ClusteredSubscriptionManager.this.getWeight();
+ }
+
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
}
public boolean hasActiveSubscribers()