You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 13:51:52 UTC
svn commit: r1295541 [7/10] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/bin/
bdbstore/etc/scripts/ bdbstore/src/main/java/ bdbstore/src/resources/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/access-...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar 1 12:51:40 2012
@@ -310,7 +310,10 @@ public abstract class AMQSession<C exten
/** Holds the highest received delivery tag. */
protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
-
+
+ /** Pre-fetched message tags */
+ protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
+
/** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
@@ -2925,11 +2928,6 @@ public abstract class AMQSession<C exten
_producers.put(new Long(producerId), producer);
}
- private void rejectAllMessages(boolean requeue)
- {
- rejectMessagesForConsumerTag(0, requeue, true);
- }
-
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
@@ -3235,7 +3233,7 @@ public abstract class AMQSession<C exten
for (C consumer : _consumers.values())
{
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
- _unacknowledgedMessageTags.addAll(tags);
+ _prefetchedMessageTags.addAll(tags);
}
setConnectionStopped(isStopped);
@@ -3345,7 +3343,7 @@ public abstract class AMQSession<C exten
}
else if (_usingDispatcherForCleanup)
{
- _unacknowledgedMessageTags.add(deliveryTag);
+ _prefetchedMessageTags.add(deliveryTag);
}
else
{
@@ -3548,4 +3546,5 @@ public abstract class AMQSession<C exten
_logger.debug("Rollback mark is set to " + _rollbackMark.get());
}
}
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Mar 1 12:51:40 2012
@@ -27,11 +27,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -47,8 +50,8 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -142,9 +145,9 @@ public class AMQSession_0_10 extends AMQ
private int unackedCount = 0;
/**
- * USed to store the range of in tx messages
+ * Used to store the range of in tx messages
*/
- private RangeSet _txRangeSet = new RangeSet();
+ private final RangeSet _txRangeSet = new RangeSet();
private int _txSize = 0;
//--- constructors
@@ -457,18 +460,33 @@ public class AMQSession_0_10 extends AMQ
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
- RangeSet ranges = gatherUnackedRangeSet();
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ RangeSet all = new RangeSet();
+ RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+ RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+ {
+ Range range = deliveredIter.next();
+ all.add(range);
+ }
+ for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+ {
+ Range range = prefetchedIter.next();
+ all.add(range);
+ }
+ flushProcessed(all, false);
+ getQpidSession().messageRelease(delivered, Option.SET_REDELIVERED);
+ getQpidSession().messageRelease(prefetched);
+
// We need to sync so that we get notify of an error.
sync();
}
- private RangeSet gatherUnackedRangeSet()
+ private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags)
{
RangeSet ranges = new RangeSet();
while (true)
{
- Long tag = _unacknowledgedMessageTags.poll();
+ Long tag = messageTags.poll();
if (tag == null)
{
break;
@@ -480,12 +498,15 @@ public class AMQSession_0_10 extends AMQ
return ranges;
}
-
public void releaseForRollback()
{
- getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
- _txRangeSet.clear();
- _txSize = 0;
+ if (_txSize > 0)
+ {
+ flushProcessed(_txRangeSet, false);
+ getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
+ _txRangeSet.clear();
+ _txSize = 0;
+ }
}
/**
@@ -499,7 +520,15 @@ public class AMQSession_0_10 extends AMQ
// The value of requeue is always true
RangeSet ranges = new RangeSet();
ranges.add((int) deliveryTag);
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ flushProcessed(ranges, false);
+ if (requeue)
+ {
+ getQpidSession().messageRelease(ranges);
+ }
+ else
+ {
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ }
//I don't think we need to sync
}
@@ -737,7 +766,7 @@ public class AMQSession_0_10 extends AMQ
Map<String,Object> arguments = new HashMap<String,Object>();
if (noLocal)
{
- arguments.put("no-local", true);
+ arguments.put(AddressHelper.NO_LOCAL, true);
}
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
@@ -1316,11 +1345,11 @@ public class AMQSession_0_10 extends AMQ
protected void acknowledgeImpl()
{
- RangeSet range = gatherUnackedRangeSet();
+ RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags);
- if(range.size() > 0 )
+ if(ranges.size() > 0 )
{
- messageAcknowledge(range, true);
+ messageAcknowledge(ranges, true);
getQpidSession().sync();
}
}
@@ -1333,6 +1362,13 @@ public class AMQSession_0_10 extends AMQ
// messages sent by the brokers following the first rollback
// after failover
_highestDeliveryTag.set(-1);
+ // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
+ //messages that came from the old broker.
+ _txRangeSet.clear();
+ _txSize = 0;
+ _unacknowledgedMessageTags.clear();
+ _prefetchedMessageTags.clear();
super.resubscribe();
+ getQpidSession().sync();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Mar 1 12:51:40 2012
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
+import java.util.ArrayList;
import java.util.Map;
import javax.jms.Destination;
@@ -40,7 +41,6 @@ import org.apache.qpid.client.protocol.A
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
@@ -62,7 +62,6 @@ import org.apache.qpid.framing.ExchangeB
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareBody;
@@ -223,6 +222,8 @@ public final class AMQSession_0_8 extend
public void sendRecover() throws AMQException, FailoverException
{
+ enforceRejectBehaviourDuringRecover();
+ _prefetchedMessageTags.clear();
_unacknowledgedMessageTags.clear();
if (isStrictAMQP())
@@ -259,6 +260,49 @@ public final class AMQSession_0_8 extend
}
}
+ private void enforceRejectBehaviourDuringRecover()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags);
+ }
+ ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values());
+ boolean messageListenerFound = false;
+ boolean serverRejectBehaviourFound = false;
+ for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
+ {
+ if (consumer.isMessageListenerSet())
+ {
+ messageListenerFound = true;
+ }
+ if (RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
+ {
+ serverRejectBehaviourFound = true;
+ }
+ }
+ _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)");
+
+ if (serverRejectBehaviourFound)
+ {
+ //reject(false) any messages we don't want returned again
+ switch(_acknowledgeMode)
+ {
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ case Session.AUTO_ACKNOWLEDGE:
+ if (!messageListenerFound)
+ {
+ break;
+ }
+ case Session.CLIENT_ACKNOWLEDGE:
+ for(Long tag : _unacknowledgedMessageTags)
+ {
+ rejectMessage(tag, false);
+ }
+ break;
+ }
+ }
+ }
+
public void releaseForRollback()
{
// Reject all the messages that have been received in this session and
@@ -267,6 +311,17 @@ public final class AMQSession_0_8 extend
// Otherwise messages will be able to arrive out of order to a second
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
+ boolean normalRejectBehaviour = true;
+ for (BasicMessageConsumer_0_8 consumer : _consumers.values())
+ {
+ if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
+ {
+ normalRejectBehaviour = false;
+ //no need to consult other consumers now, found server behaviour.
+ break;
+ }
+ }
+
while (true)
{
Long tag = _deliveredMessageTags.poll();
@@ -275,13 +330,14 @@ public final class AMQSession_0_8 extend
break;
}
- rejectMessage(tag, true);
+ rejectMessage(tag, normalRejectBehaviour);
}
}
public void rejectMessage(long deliveryTag, boolean requeue)
{
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
+ if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)||
+ ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
{
if (_logger.isDebugEnabled())
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Mar 1 12:51:40 2012
@@ -147,7 +147,6 @@ public abstract class BasicMessageConsum
private List<StackTraceElement> _closedStack = null;
-
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -211,6 +210,7 @@ public abstract class BasicMessageConsum
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
_arguments = ft;
+
}
public AMQDestination getDestination()
@@ -814,31 +814,6 @@ public abstract class BasicMessageConsum
}
}
-
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- *
- * @return the lastDeliveryTag to acknowledge
- */
- Long getLastDelivered()
- {
- if (!_receivedDeliveryTags.isEmpty())
- {
- Long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- return lastDeliveryTag;
- }
-
- return null;
- }
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Mar 1 12:51:40 2012
@@ -470,7 +470,8 @@ public class BasicMessageConsumer_0_10 e
}
}
- _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ _0_10session.flushProcessed(ranges, false);
+ _0_10session.getQpidSession().messageRelease(ranges);
clearReceiveQueue();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Mar 1 12:51:40 2012
@@ -28,7 +28,10 @@ import org.apache.qpid.client.failover.F
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +39,8 @@ public class BasicMessageConsumer_0_8 ex
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final RejectBehaviour _rejectBehaviour;
+
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
@@ -55,6 +60,25 @@ public class BasicMessageConsumer_0_8 ex
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+ if (destination.getRejectBehaviour() != null)
+ {
+ _rejectBehaviour = destination.getRejectBehaviour();
+ }
+ else
+ {
+ ConnectionURL connectionURL = connection.getConnectionURL();
+ String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR);
+ if (rejectBehaviour != null)
+ {
+ _rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase());
+ }
+ else
+ {
+ // use the default value for all connections, if not set
+ rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString());
+ _rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase());
+ }
+ }
}
void sendCancel() throws AMQException, FailoverException
@@ -89,4 +113,9 @@ public class BasicMessageConsumer_0_8 ex
{
}
+
+ public RejectBehaviour getRejectBehaviour()
+ {
+ return _rejectBehaviour;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java Thu Mar 1 12:51:40 2012
@@ -123,7 +123,7 @@ public class TopicPublisherAdapter imple
public void send(Destination dest, Message msg) throws JMSException
{
checkPreConditions();
- checkTopic(_topic);
+ checkTopic(dest);
_delegate.send(dest, msg);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Thu Mar 1 12:51:40 2012
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -75,4 +75,11 @@ public class XAConnectionImpl extends AM
{
return (XATopicSession) createXASession();
}
+
+ //Specialized call for JCA
+ public XASession createXASession(int ackMode) throws JMSException
+ {
+ checkNotClosed();
+ return _delegate.createXASession(ackMode);
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Thu Mar 1 12:51:40 2012
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE 2.0
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -43,21 +43,36 @@ public class XASessionImpl extends AMQSe
private Session _jmsSession;
- //-- Constructors
+ // Constructors
/**
* Create a JMS XASession
*/
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
int defaultPrefetchHigh, int defaultPrefetchLow)
{
- super(qpidConnection, con, channelId, false, // this is not a transacted session
- Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
+ this(qpidConnection, con, channelId, false, Session.AUTO_ACKNOWLEDGE,
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow, null);
+ }
+
+ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
+ int ackMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+ {
+ this(qpidConnection, con, channelId, false, ackMode, MessageFactoryRegistry.newDefaultRegistry(),
+ defaultPrefetchHigh, defaultPrefetchLow, null);
+
+ }
+
+ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
+ boolean transacted, int ackMode, MessageFactoryRegistry registry, int defaultPrefetchHigh, int defaultPrefetchLow,
+ String name)
+ {
+ super(qpidConnection, con, channelId, transacted, ackMode, registry, defaultPrefetchHigh, defaultPrefetchLow, name);
createSession();
_xaResource = new XAResourceImpl(this);
- }
+ }
+
- //-- public methods
+ // public methods
/**
* Create a qpid session.
@@ -70,7 +85,7 @@ public class XASessionImpl extends AMQSe
}
- //--- javax.njms.XASEssion API
+ // javax.njms.XASEssion API
/**
* Gets the session associated with this XASession.
@@ -97,7 +112,7 @@ public class XASessionImpl extends AMQSe
return _xaResource;
}
- //-- overwritten mehtods
+ // overwritten mehtods
/**
* Throws a {@link TransactionInProgressException}, since it should
* not be called for an XASession object.
@@ -132,7 +147,7 @@ public class XASessionImpl extends AMQSe
return _qpidDtxSession;
}
- //--- interface XAQueueSession
+ // interface XAQueueSession
/**
* Gets the topic session associated with this <CODE>XATopicSession</CODE>.
*
@@ -144,7 +159,7 @@ public class XASessionImpl extends AMQSe
return (QueueSession) getSession();
}
- //--- interface XATopicSession
+ // interface XATopicSession
/**
* Gets the topic session associated with this <CODE>XATopicSession</CODE>.
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Thu Mar 1 12:51:40 2012
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,3 +30,4 @@ CRAM-MD5-HASHED.3=org.apache.qpid.client
CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+ANONYMOUS.7=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Thu Mar 1 12:51:40 2012
@@ -41,7 +41,16 @@ public interface ConnectionURL
public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
- public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+
+ /**
+ * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
+ * <p>
+ * It tells the client to delegate the requeue/DLQ decision to the
+ * server .If this option is not specified, the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ */
+ public static final String OPTIONS_REJECT_BEHAVIOUR = "rejectbehaviour";
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Thu Mar 1 12:51:40 2012
@@ -287,29 +287,6 @@ public class AMQSession_0_10Test extends
assertNotNull("ExecutionSync was not sent", event);
}
- public void testRejectMessage()
- {
- AMQSession_0_10 session = createAMQSession_0_10();
- session.rejectMessage(1l, true);
- ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
- assertNotNull("MessageRelease event was not sent", event);
- }
-
- public void testReleaseForRollback()
- {
- AMQSession_0_10 session = createAMQSession_0_10();
- try
- {
- session.releaseForRollback();
- }
- catch (Exception e)
- {
- fail("Unexpected exception is cought:" + e.getMessage());
- }
- ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
- assertNotNull("MessageRelease event was not sent", event);
- }
-
public void testSendQueueDelete()
{
AMQSession_0_10 session = createAMQSession_0_10();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Thu Mar 1 12:51:40 2012
@@ -55,4 +55,9 @@ public class MockAMQConnection extends A
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN);
return null;
}
+
+ public AMQConnectionDelegate getDelegate()
+ {
+ return _delegate;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Thu Mar 1 12:51:40 2012
@@ -20,19 +20,35 @@
*/
package org.apache.qpid.test.unit.client.BrokerDetails;
-import java.util.HashMap;
-import java.util.Map;
-
import junit.framework.TestCase;
import org.apache.qpid.client.AMQBrokerDetails;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.url.URLSyntaxException;
public class BrokerDetailsTest extends TestCase
{
+ public void testDefaultTCP_NODELAY() throws URLSyntaxException
+ {
+ String brokerURL = "tcp://localhost:5672";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+ assertNull("default value should be null", broker.getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY));
+ }
+
+ public void testOverridingTCP_NODELAY() throws URLSyntaxException
+ {
+ String brokerURL = "tcp://localhost:5672?tcp_nodelay='true'";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+ assertTrue("value should be true", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)));
+
+ brokerURL = "tcp://localhost:5672?tcp_nodelay='false''&maxprefetch='1'";
+ broker = new AMQBrokerDetails(brokerURL);
+
+ assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)));
+ }
+
public void testMultiParameters() throws URLSyntaxException
{
String url = "tcp://localhost:5672?timeout='200',immediatedelivery='true'";
@@ -82,9 +98,4 @@ public class BrokerDetailsTest extends T
}
}
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(BrokerDetailsTest.class);
- }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Thu Mar 1 12:51:40 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ public class ConnectionURLTest extends T
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
- assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));
+ assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
assertTrue(connectionurl.getVirtualHost().equals("/test"));
@@ -274,6 +274,34 @@ public class ConnectionURLTest extends T
// assertTrue(service.getPort() == 1234);
}
+ /**
+ * Test for QPID-3662 to ensure the {@code toString()} representation is correct.
+ */
+ public void testConnectionURLOptionToString() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''";
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertNull(connectionurl.getFailoverMethod());
+ assertEquals("guest", connectionurl.getUsername());
+ assertEquals("guest", connectionurl.getPassword());
+ assertEquals("client", connectionurl.getClientName());
+ assertEquals("/localhost", connectionurl.getVirtualHost());
+ assertEquals("1", connectionurl.getOption("maxprefetch"));
+ assertTrue(connectionurl.getBrokerCount() == 1);
+
+ BrokerDetails service = connectionurl.getBrokerDetails(0);
+ assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getHost().equals("localhost"));
+ assertTrue(service.getPort() == 1234);
+ assertTrue(service.getProperties().containsKey("tcp_nodelay"));
+ assertEquals("true", service.getProperties().get("tcp_nodelay"));
+
+ String nopasswd = "amqp://guest:********@client/localhost?maxprefetch='1'&brokerlist='tcp://localhost:1234?tcp_nodelay='true''";
+ String tostring = connectionurl.toString();
+ assertEquals(tostring.indexOf("maxprefetch"), tostring.lastIndexOf("maxprefetch"));
+ assertEquals(nopasswd, tostring);
+ }
public void testSingleTransportMultiOptionURL() throws URLSyntaxException
{
@@ -338,7 +366,7 @@ public class ConnectionURLTest extends T
assertTrue(connectionurl.getPassword().equals("pass"));
assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getClientName().equals("client_id"));
-
+
assertTrue(connectionurl.getBrokerCount() == 1);
}
@@ -457,7 +485,6 @@ public class ConnectionURLTest extends T
assertTrue(service.getTransport().equals("tcp"));
-
assertTrue(service.getHost().equals("localhost"));
assertTrue(service.getPort() == 5672);
assertEquals("jim",service.getProperty("foo"));
@@ -468,7 +495,7 @@ public class ConnectionURLTest extends T
assertTrue(connectionurl.getOption("timeout").equals("200"));
assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
}
-
+
/**
* Test that options other than failover and brokerlist are returned in the string representation.
* <p>
@@ -477,7 +504,7 @@ public class ConnectionURLTest extends T
public void testOptionToString() throws Exception
{
ConnectionURL url = new AMQConnectionURL("amqp://user:pass@temp/test?maxprefetch='12345'&brokerlist='tcp://localhost:5672'");
-
+
assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'"));
}
@@ -493,10 +520,10 @@ public class ConnectionURLTest extends T
assertTrue(connectionurl.getBrokerCount() == 1);
BrokerDetails service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getTransport().equals("tcp"));
assertTrue(service.getHost().equals("under_score"));
assertTrue(service.getPort() == 6672);
-
+
url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'";
connectionurl = new AMQConnectionURL(url);
@@ -507,11 +534,44 @@ public class ConnectionURLTest extends T
assertTrue(connectionurl.getBrokerCount() == 1);
service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getTransport().equals("tcp"));
assertTrue(service.getHost().equals("under_score"));
assertTrue(service.getPort() == 5672);
}
-
+
+
+ public void testRejectBehaviourPresent() throws Exception
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&rejectbehaviour='server'";
+
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertTrue(connectionURL.getFailoverMethod() == null);
+ assertTrue(connectionURL.getUsername().equals("guest"));
+ assertTrue(connectionURL.getPassword().equals("guest"));
+ assertTrue(connectionURL.getVirtualHost().equals("/test"));
+
+ //check that the reject behaviour option is returned as expected
+ assertEquals("Reject behaviour option was not as expected", "server",
+ connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
+ }
+
+ public void testRejectBehaviourNotPresent() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod() == null);
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ //check that the reject behaviour option is null as expected
+ assertNull("Reject behaviour option was not as expected",
+ connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionURLTest.class);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java Thu Mar 1 12:51:40 2012
@@ -22,8 +22,11 @@ package org.apache.qpid.test.unit.client
import junit.framework.TestCase;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -190,6 +193,67 @@ public class DestinationURLTest extends
assertTrue(dest.getQueueName().equals("test:testQueueD"));
}
+ public void testRejectBehaviourPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?rejectbehaviour='server'";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+ assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+ assertTrue(burl.getExchangeName().equals("exchangeName"));
+ assertTrue(burl.getDestinationName().equals("Destination"));
+ assertTrue(burl.getQueueName().equals("Queue"));
+
+ //check that the MaxDeliveryCount property has the right value
+ assertEquals("server",burl.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR));
+
+ //check that the MaxDeliveryCount value is correctly returned from an AMQDestination
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertEquals("Reject behaviour is unexpected", RejectBehaviour.SERVER, dest.getRejectBehaviour());
+ }
+
+ public void testRejectBehaviourNotPresent() throws URISyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue";
+
+ AMQBindingURL burl = new AMQBindingURL(url);
+
+ assertTrue(url.equals(burl.toString()));
+
+ assertTrue(burl.getExchangeClass().equals("exchangeClass"));
+ assertTrue(burl.getExchangeName().equals("exchangeName"));
+ assertTrue(burl.getDestinationName().equals("Destination"));
+ assertTrue(burl.getQueueName().equals("Queue"));
+
+ class MyTestAMQDestination extends AMQDestination
+ {
+ public MyTestAMQDestination(BindingURL url)
+ {
+ super(url);
+ }
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+ };
+
+ AMQDestination dest = new MyTestAMQDestination(burl);
+ assertNull("Reject behaviour is unexpected", dest.getRejectBehaviour());
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DestinationURLTest.class);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Thu Mar 1 12:51:40 2012
@@ -102,6 +102,20 @@ public class ClientProperties
*/
public static final int DEFAULT_SYNC_OPERATION_TIMEOUT = 60000;
+ /**
+ * System properties to change the default value used for TCP_NODELAY
+ */
+ public static final String QPID_TCP_NODELAY_PROP_NAME = "qpid.tcp_nodelay";
+ public static final String AMQJ_TCP_NODELAY_PROP_NAME = "amqj.tcp_nodelay";
+
+ /**
+ * System property to set the reject behaviour. default value will be 'normal' but can be
+ * changed to 'server' in which case the server decides whether a message should be requeued
+ * or dead lettered.
+ * This can be overridden by the more specific settings at connection or binding URL level.
+ */
+ public static final String REJECT_BEHAVIOUR_PROP_NAME = "qpid.reject.behaviour";
+
/*
public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Thu Mar 1 12:51:40 2012
@@ -22,6 +22,8 @@ package org.apache.qpid.transport;
import java.util.Map;
+import org.apache.qpid.configuration.ClientProperties;
+
/**
* A ConnectionSettings object can only be associated with
* one Connection object. I have added an assertion that will
@@ -38,7 +40,8 @@ public class ConnectionSettings
String username = "guest";
String password = "guest";
int port = 5672;
- boolean tcpNodelay = Boolean.getBoolean("amqj.tcp_nodelay");
+ boolean tcpNodelay = Boolean.valueOf(System.getProperty(ClientProperties.QPID_TCP_NODELAY_PROP_NAME,
+ System.getProperty(ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME, "true")));
int maxChannelCount = 32767;
int maxFrameSize = 65535;
int heartbeatInterval;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Thu Mar 1 12:51:40 2012
@@ -60,7 +60,8 @@ public class IoNetworkTransport implemen
LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
-
+ LOGGER.debug("TCP_NODELAY : %s", _socket.getTcpNoDelay());
+
InetAddress address = InetAddress.getByName(settings.getHost());
_socket.connect(new InetSocketAddress(address, settings.getPort()));
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Mar 1 12:51:40 2012
@@ -33,6 +33,8 @@ import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLSocket;
+
/**
* IoReceiver
*
@@ -94,7 +96,7 @@ final class IoReceiver implements Runnab
{
try
{
- if (shutdownBroken)
+ if (shutdownBroken || socket instanceof SSLSocket)
{
socket.close();
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java Thu Mar 1 12:51:40 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.transport.network.security;
import org.apache.qpid.ssl.SSLContextFactory;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java Thu Mar 1 12:51:40 2012
@@ -37,6 +37,15 @@ public interface BindingURL
public static final String OPTION_ROUTING_KEY = "routingkey";
public static final String OPTION_BINDING_KEY = "bindingkey";
+ /**
+ * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
+ * <p>
+ * It tells the client to delegate the requeue/DLQ decision to the
+ * server .If this option is not specified, the messages won't be moved to
+ * the DLQ (or dropped) when delivery count exceeds the maximum.
+ */
+ public static final String OPTION_REJECT_BEHAVIOUR = "rejectbehaviour";
+
String getURL();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/util/default.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/util/default.properties?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/util/default.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/util/default.properties Thu Mar 1 12:51:40 2012
@@ -1,2 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
# Used by FileUtilsTests
src=default.properties
\ No newline at end of file
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/util/mydefaults.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/util/mydefaults.properties?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/util/mydefaults.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/test/java/org/apache/qpid/util/mydefaults.properties Thu Mar 1 12:51:40 2012
@@ -1,2 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
# Used by FileUtilsTests
src=mydefaults
\ No newline at end of file
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/management/LoggingManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747869,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java Thu Mar 1 12:51:40 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.management.commo
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -118,6 +119,24 @@ public interface ManagedBroker
throws IOException, JMException, MBeanException;
/**
+ * Create a new Queue in the VirtualHost
+ *
+ * @since Qpid JMX API 2.4
+ * @param queueName name of the new queue
+ * @param durable true if the queue should be durable
+ * @param owner owner
+ * @param arguments declaration arguments for use when creating the queue, may be null.
+ * @throws IOException
+ * @throws JMException
+ */
+ @MBeanOperation(name="createNewQueue", description="Create a new Queue in the VirtualHost", impact= MBeanOperationInfo.ACTION)
+ void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName,
+ @MBeanOperationParameter(name="owner", description="Owner name")String owner,
+ @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable,
+ @MBeanOperationParameter(name="arguments", description="Map of arguments")Map<String,Object> arguments)
+ throws IOException, JMException;
+
+ /**
* Unregisters the Queue bindings, removes the subscriptions and unregisters
* from the managed objects.
*
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java Thu Mar 1 12:51:40 2012
@@ -80,33 +80,12 @@ public interface ManagedConnection
Date getLastIoTime();
/**
- * Tells the total number of bytes written till now.
- * @return number of bytes written.
- *
- @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
- Long getWrittenBytes();
- */
- /**
- * Tells the total number of bytes read till now.
- * @return number of bytes read.
- *
- @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
- Long getReadBytes();
- */
-
- /**
* Threshold high value for no of channels. This is useful in setting notifications or
* taking required action is there are more channels being created.
* @return threshold limit for no of channels
*/
- Long getMaximumNumberOfChannels();
-
- /**
- * Sets the threshold high value for number of channels for a connection
- * @param value
- */
@MBeanAttribute(name="MaximumNumberOfChannels", description="The threshold high value for number of channels for this connection")
- void setMaximumNumberOfChannels(Long value);
+ Long getMaximumNumberOfChannels();
//********** Operations *****************//
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java Thu Mar 1 12:51:40 2012
@@ -50,7 +50,8 @@ public interface ManagedQueue
String MSG_SIZE = "Size(bytes)";
String MSG_REDELIVERED = "Redelivered";
String MSG_QUEUE_POS = "Queue Position";
- List<String> VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC = Collections.unmodifiableList(Arrays.asList(MSG_AMQ_ID, MSG_HEADER, MSG_SIZE, MSG_REDELIVERED, MSG_QUEUE_POS));
+ String MSG_DELIVERY_COUNT = "Delivery Count";
+ List<String> VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC = Collections.unmodifiableList(Arrays.asList(MSG_AMQ_ID, MSG_HEADER, MSG_SIZE, MSG_REDELIVERED, MSG_QUEUE_POS, MSG_DELIVERY_COUNT));
List<String> VIEW_MSGS_TABULAR_UNIQUE_INDEX = Collections.unmodifiableList(Arrays.asList(MSG_QUEUE_POS));
//CompositeType key/description information for message content
@@ -67,6 +68,7 @@ public interface ManagedQueue
static final String ATTR_MAX_MSG_COUNT = "MaximumMessageCount";
static final String ATTR_MAX_QUEUE_DEPTH = "MaximumQueueDepth";
static final String ATTR_MAX_MSG_SIZE = "MaximumMessageSize";
+ static final String ATTR_MAXIMUM_DELIVERY_COUNT = "MaximumDeliveryCount";
static final String ATTR_DURABLE = "Durable";
static final String ATTR_AUTODELETE = "AutoDelete";
static final String ATTR_CONSUMER_COUNT = "ConsumerCount";
@@ -78,7 +80,8 @@ public interface ManagedQueue
static final String ATTR_FLOW_OVERFULL = "FlowOverfull";
static final String ATTR_FLOW_RESUME_CAPACITY = "FlowResumeCapacity";
static final String ATTR_EXCLUSIVE = "Exclusive";
-
+ static final String ATTR_ALT_EXCHANGE = "AlternateExchange";
+
//All attribute names constant
static final List<String> QUEUE_ATTRIBUTES
= Collections.unmodifiableList(
@@ -91,6 +94,7 @@ public interface ManagedQueue
ATTR_MAX_MSG_COUNT,
ATTR_MAX_QUEUE_DEPTH,
ATTR_MAX_MSG_SIZE,
+ ATTR_MAXIMUM_DELIVERY_COUNT,
ATTR_DURABLE,
ATTR_AUTODELETE,
ATTR_CONSUMER_COUNT,
@@ -101,7 +105,9 @@ public interface ManagedQueue
ATTR_CAPACITY,
ATTR_FLOW_OVERFULL,
ATTR_FLOW_RESUME_CAPACITY,
- ATTR_EXCLUSIVE))));
+ ATTR_EXCLUSIVE,
+ ATTR_ALT_EXCHANGE
+ ))));
/**
* Returns the Name of the ManagedQueue.
@@ -120,6 +126,16 @@ public interface ManagedQueue
Integer getMessageCount() throws IOException;
/**
+ * Maximum number of times a message is permitted to be delivered or zero if not enforced.
+ *
+ * @since Qpid JMX API 2.4
+ * @return maximum delivery count
+ * @throws IOException
+ */
+ @MBeanAttribute(name="MaximumDeliveryCount", description = "Maximum number of times a message is permitted to be delivered or zero if not enforced")
+ Integer getMaximumDeliveryCount() throws IOException;
+
+ /**
* Tells the total number of messages receieved by the queue since startup.
* @return total number of messages received.
* @throws IOException
@@ -309,7 +325,7 @@ public interface ManagedQueue
/**
* Sets whether the queue is exclusive or not.
- *
+ *
* @since Qpid JMX API 2.0
* @param exclusive the capacity in bytes
* @throws IOException
@@ -318,6 +334,25 @@ public interface ManagedQueue
@MBeanAttribute(name="Exclusive", description="Whether the queue is Exclusive or not")
void setExclusive(boolean exclusive) throws IOException, JMException;
+ /**
+ * Sets the Alternate Exchange for the queue, for use in dead letter queue functionality.
+ *
+ * @since Qpid JMX API 2.4
+ * @param the name of the exchange to use. Specifying null or the empty string will clear the alternate exchange.
+ * @throws IOException
+ */
+ void setAlternateExchange(String exchangeName) throws IOException;
+
+ /**
+ * Returns the name of the Alternate Exchange for the queue, or null if there isn't one.
+ *
+ * @since Qpid JMX API 2.4
+ * @return the name of the Alternate Exchange for the queue, or null if there isn't one
+ * @throws IOException
+ */
+ @MBeanAttribute(name="AlternateExchange", description="Alternate exchange for the queue")
+ String getAlternateExchange() throws IOException;
+
//********** Operations *****************//
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java Thu Mar 1 12:51:40 2012
@@ -47,7 +47,7 @@ public interface ServerInformation
* Qpid JMX API 1.1 can be assumed.
*/
int QPID_JMX_API_MAJOR_VERSION = 2;
- int QPID_JMX_API_MINOR_VERSION = 3;
+ int QPID_JMX_API_MINOR_VERSION = 4;
/**
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/test/java/org/apache/qpid/management/common/mbeans/ManagedQueueTest.java Thu Mar 1 12:51:40 2012
@@ -23,7 +23,6 @@ package org.apache.qpid.management.commo
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.util.ArrayList;
import java.util.List;
import javax.management.MBeanAttributeInfo;
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/management/eclipse-plugin/src:788755
/qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/eclipse-plugin/src:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java Thu Mar 1 12:51:40 2012
@@ -47,7 +47,7 @@ public abstract class ApplicationRegistr
//max supported broker management interface supported by this release of the management console
public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 2;
- public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 3;
+ public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 4;
public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc";
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java Thu Mar 1 12:51:40 2012
@@ -579,7 +579,7 @@ public class NavigationView extends View
List<TreeObject> childNodes = typeNode.getChildren();
for (TreeObject child : childNodes)
{
- if (MBEAN.equals(child.getType()) && mbeanName.equals(child.getName()))
+ if (MBEAN.equals(child.getType()) && mbeanName != null && mbeanName.equals(child.getName()))
{
return true;
}
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org