You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/12 01:22:20 UTC
svn commit: r824198 [7/9] - in /qpid/branches/java-network-refactor: ./
qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/
qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tests/
qpid/cpp/boost-1.32-support/ qpid/cpp/etc/ qpid/cpp/examples...
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Sun Oct 11 23:22:08 2009
@@ -100,6 +100,18 @@
}
/**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
+ /**
* create an XA Session and start it if required.
*/
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
@@ -285,7 +297,6 @@
_qpidConnection.setIdleTimeout(l);
}
- @Override
public int getMaxChannelID()
{
return Integer.MAX_VALUE;
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Sun Oct 11 23:22:08 2009
@@ -191,6 +191,18 @@
}, _conn).execute();
}
+ /**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException thrown if there is a problem creating the session.
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException, FailoverException
{
@@ -290,7 +302,6 @@
public void setIdleTimeout(long l){}
- @Override
public int getMaxChannelID()
{
return (int) (Math.pow(2, 16)-1);
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Oct 11 23:22:08 2009
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import java.io.Serializable;
-import java.io.IOException;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -60,6 +59,7 @@
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -113,7 +113,6 @@
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
-
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -198,16 +197,32 @@
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
- protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+ protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
- protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+ protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
protected final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
@@ -244,10 +259,10 @@
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
- private int _defaultPrefetchHighMark;
+ private int _prefetchHighMark;
/** Holds the low mark for prefetched messages, below which the session is resumed. */
- private int _defaultPrefetchLowMark;
+ private int _prefetchLowMark;
/** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
@@ -428,13 +443,13 @@
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
- _defaultPrefetchHighMark = defaultPrefetchHighMark;
- _defaultPrefetchLowMark = defaultPrefetchLowMark;
+ _prefetchHighMark = defaultPrefetchHighMark;
+ _prefetchLowMark = defaultPrefetchLowMark;
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
- new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -442,7 +457,7 @@
public void aboveThreshold(int currentValue)
{
_logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
+ "Above threshold(" + _prefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
_suspendState.set(true);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -452,7 +467,7 @@
public void underThreshold(int currentValue)
{
_logger.debug(
- "Below threshold(" + _defaultPrefetchLowMark
+ "Below threshold(" + _prefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
_suspendState.set(false);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -462,7 +477,7 @@
}
else
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+ _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
}
@@ -759,8 +774,16 @@
try
{
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
+ {
+ rollback();
+
+ throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+ "Forced rollback");
+ }
+
- // TGM FIXME: what about failover?
// Acknowledge all delivered messages
while (true)
{
@@ -778,7 +801,7 @@
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
@@ -870,7 +893,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false,
messageSelector, null, true, true);
}
@@ -878,7 +901,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
false, false);
}
@@ -886,7 +909,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
false, false);
}
@@ -894,7 +917,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -903,7 +926,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -912,7 +935,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
messageSelector, null, false, false);
}
@@ -1336,17 +1359,17 @@
public int getDefaultPrefetch()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchHigh()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchLow()
{
- return _defaultPrefetchLowMark;
+ return _prefetchLowMark;
}
public AMQShortString getDefaultQueueExchangeName()
@@ -1491,6 +1514,8 @@
sendRecover();
+ markClean();
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1559,6 +1584,14 @@
suspendChannel(true);
}
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+
+ syncDispatchQueue();
+
+ _dispatcher.rollback();
+
releaseForRollback();
sendRollback();
@@ -1851,26 +1884,58 @@
void failoverPrep()
{
- startDispatcherIfNecessary();
syncDispatchQueue();
}
void syncDispatchQueue()
{
- final CountDownLatch signal = new CountDownLatch(1);
- _queue.add(new Dispatchable() {
- public void dispatch(AMQSession ssn)
+ if (Thread.currentThread() == _dispatcherThread)
+ {
+ while (!_closed.get() && !_queue.isEmpty())
{
- signal.countDown();
+ Dispatchable disp;
+ try
+ {
+ disp = (Dispatchable) _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // Check just in case _queue becomes empty, it shouldn't but
+ // better than an NPE.
+ if (disp == null)
+ {
+ _logger.debug("_queue became empty during sync.");
+ break;
+ }
+
+ disp.dispatch(AMQSession.this);
}
- });
- try
- {
- signal.await();
}
- catch (InterruptedException e)
+ else
{
- throw new RuntimeException(e);
+ startDispatcherIfNecessary();
+
+ final CountDownLatch signal = new CountDownLatch(1);
+
+ _queue.add(new Dispatchable()
+ {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
@@ -2233,7 +2298,7 @@
private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, false);
+ return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
}
private P createProducerImpl(final Destination destination, final boolean mandatory,
@@ -2704,15 +2769,26 @@
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
+ _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
}
- public void checkFlowControl() throws InterruptedException
+ public void checkFlowControl() throws InterruptedException, JMSException
{
+ long expiryTime = 0L;
synchronized (_flowControl)
{
- while (!_flowControl.getFlowControl())
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+ : expiryTime) >= System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
+ _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
+ if(!_flowControl.getFlowControl())
{
- _flowControl.wait();
+ _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+ throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
}
}
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sun Oct 11 23:22:08 2009
@@ -414,9 +414,6 @@
public void releaseForRollback()
{
- startDispatcherIfNecessary();
- syncDispatchQueue();
- _dispatcher.rollback();
getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
_txRangeSet.clear();
_txSize = 0;
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sun Oct 11 23:22:08 2009
@@ -195,6 +195,12 @@
public void releaseForRollback()
{
+ // Reject all the messages that have been received in this session and
+ // have not yet been acknowledged. Should look to remove
+ // _deliveredMessageTags and use _txRangeSet as used by 0-10.
+ // Otherwise messages will be able to arrive out of order to a second
+ // consumer on the queue. Whilst this is within the JMS spec it is not
+ // user friendly and avoidable.
while (true)
{
Long tag = _deliveredMessageTags.poll();
@@ -205,11 +211,6 @@
rejectMessage(tag, true);
}
-
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
}
public void rejectMessage(long deliveryTag, boolean requeue)
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sun Oct 11 23:22:08 2009
@@ -779,6 +779,7 @@
else
{
_session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
}
break;
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sun Oct 11 23:22:08 2009
@@ -60,7 +60,7 @@
/**
* Priority of messages created by this producer.
*/
- private int _messagePriority;
+ private int _messagePriority = Message.DEFAULT_PRIORITY;
/**
* Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Sun Oct 11 23:22:08 2009
@@ -47,7 +47,7 @@
public synchronized XASession createXASession() throws JMSException
{
checkNotClosed();
- return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2);
+ return _delegate.createXASession();
}
//-- Interface XAQueueConnection
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Sun Oct 11 23:22:08 2009
@@ -39,7 +39,7 @@
* type: long
*/
public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
- public static final String MAX_PREFETCH_DEFAULT = "5000";
+ public static final String MAX_PREFETCH_DEFAULT = "500";
/**
* When true a sync command is sent after every persistent messages.
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Sun Oct 11 23:22:08 2009
@@ -21,6 +21,7 @@
package org.apache.qpid.client.failover;
import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
@@ -134,6 +135,7 @@
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
+ // Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Sun Oct 11 23:22:08 2009
@@ -39,6 +39,7 @@
private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance();
private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
@@ -159,7 +160,8 @@
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- return false;
+ _channelFlowMethodHandler.methodReceived(_session, body, channelId);
+ return true;
}
public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Sun Oct 11 23:22:08 2009
@@ -308,7 +308,6 @@
*/
public void exception(Throwable cause)
{
- _logger.info("AS: HELLO");
if (_failoverState == FailoverState.NOT_STARTED)
{
// if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Sun Oct 11 23:22:08 2009
@@ -253,7 +253,7 @@
}
else
{
- System.err.println("WARNING: new error arrived while old one not yet processed");
+ System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
}
try
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Sun Oct 11 23:22:08 2009
@@ -189,7 +189,8 @@
{
synchronized (_brokerListLock)
{
- return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ return _currentBrokerDetail;
}
}
@@ -214,7 +215,15 @@
broker.getHost().equals(_currentBrokerDetail.getHost()) &&
broker.getPort() == _currentBrokerDetail.getPort())
{
- return getNextBrokerDetails();
+ if (_connectionDetails.getBrokerCount() > 1)
+ {
+ return getNextBrokerDetails();
+ }
+ else
+ {
+ _failedAttemps ++;
+ return null;
+ }
}
String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
Modified: qpid/branches/java-network-refactor/qpid/java/common/bin/qpid-run
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/bin/qpid-run?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/bin/qpid-run (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/bin/qpid-run Sun Oct 11 23:22:08 2009
@@ -111,6 +111,7 @@
fi
log $INFO System Properties set to $SYSTEM_PROPS
+log $INFO QPID_OPTS set to $QPID_OPTS
program=$(basename $0)
sourced=${BASH_SOURCE[0]}
Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java Sun Oct 11 23:22:08 2009
@@ -51,7 +51,6 @@
System.out.println("CLOSED");
}
- @Override
public void setIdleTimeout(long l)
{
// TODO Auto-generated method stub
Propchange: qpid/branches/java-network-refactor/qpid/java/lib/org.osgi.core_1.0.0.jar
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:443187-720930
-/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:805429-816233
+/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/client/src/main/java/org/apache/qpid/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:443187-703176
-/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:805429-816233
+/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/client/src/test/java/org/apache/qpid/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:443187-703176
-/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:805429-816233
+/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java Sun Oct 11 23:22:08 2009
@@ -110,7 +110,7 @@
* Reloads the log4j configuration file, applying any changes made.
*
* @throws IOException
- * @since Qpid JMX API 1.3
+ * @since Qpid JMX API 1.4
*/
@MBeanOperation(name = "reloadConfigFile", description = "Reload the log4j xml configuration file", impact = MBeanOperationInfo.ACTION)
void reloadConfigFile() throws IOException;
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:757268
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:757257
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java Sun Oct 11 23:22:08 2009
@@ -43,7 +43,7 @@
* Qpid JMX API 1.1 can be assumed.
*/
int QPID_JMX_API_MAJOR_VERSION = 1;
- int QPID_JMX_API_MINOR_VERSION = 3;
+ int QPID_JMX_API_MINOR_VERSION = 4;
/**
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:757268
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:757268
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:757268
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:757268
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:757268
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/qpid/branches/jmx_mc_gsoc09/qpid/java/management/eclipse-plugin/src:788755
-/qpid/trunk/qpid/java/management/eclipse-plugin/src:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java Sun Oct 11 23:22:08 2009
@@ -48,7 +48,7 @@
//max supported broker management interface supported by this release of the management console
public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 1;
- public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 3;
+ public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 4;
public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc";
Modified: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java Sun Oct 11 23:22:08 2009
@@ -349,7 +349,7 @@
_logWatchIntervalLabel.setFont(ApplicationRegistry.getFont(FONT_BOLD));
_logWatchIntervalLabel.setLayoutData(new GridData(SWT.LEFT, SWT.CENTER, false, true));
- if(_ApiVersion.greaterThanOrEqualTo(1, 3))
+ if(_ApiVersion.greaterThanOrEqualTo(1, 4))
{
Group reloadConfigFileGroup = new Group(attributesComposite, SWT.SHADOW_NONE);
reloadConfigFileGroup.setBackground(attributesComposite.getBackground());
Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/module.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/module.xml?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/module.xml (original)
+++ qpid/branches/java-network-refactor/qpid/java/module.xml Sun Oct 11 23:22:08 2009
@@ -261,6 +261,7 @@
<jvmarg value="${jvm.args}"/>
<sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
+ <sysproperty key="amqj.server.logging.level" value="${amqj.server.logging.level}"/>
<sysproperty key="amqj.protocol.logging.level" value="${amqj.protocol.logging.level}"/>
<sysproperty key="log4j.debug" value="${log4j.debug}"/>
<sysproperty key="root.logging.level" value="${root.logging.level}"/>
@@ -269,6 +270,7 @@
<sysproperty key="java.naming.provider.url" value="${java.naming.provider.url}"/>
<sysproperty key="broker" value="${broker}"/>
<sysproperty key="broker.clean" value="${broker.clean}"/>
+ <sysproperty key="broker.clean.between.tests" value="${broker.clean.between.tests}"/>
<sysproperty key="broker.version" value="${broker.version}"/>
<sysproperty key="broker.ready" value="${broker.ready}" />
<sysproperty key="broker.stopped" value="${broker.stopped}" />
Propchange: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:443187-707694
-/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:805429-816233
+/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java Sun Oct 11 23:22:08 2009
@@ -78,15 +78,23 @@
// Add an invalid value
_broker += " -l invalid";
- // The release-bin build of the broker uses this log4j configuration
- // so set up the broker environment to use it for this test.
- // Also include -Dlog4j.debug so we can validate that it picked up this config
- setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug -Dlog4j.configuration=file:" + System.getProperty(QPID_HOME) + "/../broker/src/main/java/log4j.properties");
+ // The broker has a built in default log4j configuration set up
+ // so if the the broker cannot load the -l value it will use default
+ // use this default. Test that this is correctly loaded, by
+ // including -Dlog4j.debug so we can validate.
+ setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug");
// Disable all client logging so we can test for broker DEBUG only.
- Logger.getRootLogger().setLevel(Level.WARN);
- Logger.getLogger("qpid.protocol").setLevel(Level.WARN);
- Logger.getLogger("org.apache.qpid").setLevel(Level.WARN);
+ setLoggerLevel(Logger.getRootLogger(), Level.WARN);
+ setLoggerLevel(Logger.getLogger("qpid.protocol"), Level.WARN);
+ setLoggerLevel(Logger.getLogger("org.apache.qpid"), Level.WARN);
+
+ // Set the broker to use info level logging, which is the qpid-server
+ // default. Rather than debug which is the test default.
+ setBrokerOnlySystemProperty("amqj.server.logging.level", "info");
+ // Set the logging defaults to info for this test.
+ setBrokerOnlySystemProperty("amqj.logging.level", "info");
+ setBrokerOnlySystemProperty("root.logging.level", "info");
startBroker();
Copied: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java (from r824132, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java?p2=qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java&p1=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java&r1=824132&r2=824198&rev=824198&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java Sun Oct 11 23:22:08 2009
@@ -188,8 +188,7 @@
// Send IO Exception - causing failover
_connection.getProtocolHandler().
- exceptionCaught(_connection.getProtocolHandler().getProtocolSession().getIoSession(),
- new WriteTimeoutException("WriteTimeoutException to cause failover."));
+ exception(new WriteTimeoutException("WriteTimeoutException to cause failover."));
// Verify Failover occured through ConnectionListener
assertTrue("Failover did not occur",
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java Sun Oct 11 23:22:08 2009
@@ -160,7 +160,7 @@
// Set the broker.ready string to check for the _log4j default that
// is still present on standard out.
- System.setProperty(BROKER_READY, "Qpid Broker Ready");
+ setTestClientSystemProperty(BROKER_READY, "Qpid Broker Ready");
startBroker();
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java Sun Oct 11 23:22:08 2009
@@ -63,7 +63,6 @@
*/
public class DeepQueueConsumeWithSelector extends QpidTestCase implements MessageListener
{
- private static final String INDEX = "index";
private static final int MESSAGE_COUNT = 10000;
private static final int BATCH_SIZE = MESSAGE_COUNT / 10;
@@ -129,9 +128,7 @@
@Override
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
- Message message = session.createTextMessage("Message :" + msgCount);
-
- message.setIntProperty(INDEX, msgCount);
+ Message message = super.createNextMessage(session,msgCount);
if ((msgCount % BATCH_SIZE) == 0 )
{
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java Sun Oct 11 23:22:08 2009
@@ -21,27 +21,34 @@
package org.apache.qpid.server.security.acl;
-
-import junit.framework.TestCase;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.*;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.naming.NamingException;
-
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -54,6 +61,14 @@
public void setUp() throws Exception
{
+ //Performing setUp here would result in a broker with the default ACL test config
+
+ //Each test now calls the private setUpACLTest to allow them to make
+ //individual customisations to the base ACL settings
+ }
+
+ private void setUpACLTest() throws Exception
+ {
final String QPID_HOME = System.getProperty("QPID_HOME");
if (QPID_HOME == null)
@@ -73,8 +88,10 @@
return "amqp://" + username + ":" + password + "@clientid/test?brokerlist='" + getBroker() + "?retries='0''";
}
- public void testAccessAuthorized() throws AMQException, URLSyntaxException
+ public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -96,6 +113,8 @@
public void testAccessNoRights() throws Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("guest", "guest");
@@ -120,8 +139,40 @@
}
}
- public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException
+ public void testGuestConsumeWithCreateRightsAndWithoutConsumeRights() throws NamingException, ConfigurationException, IOException, Exception
+ {
+ //Customise the ACL config to give the guest user some create (could be any, non-consume) rights to
+ //force creation of a PrincipalPermissions instance to perform the consume rights check against.
+ setConfigurationProperty("virtualhosts.virtualhost.test.security.access_control_list.create.queues.queue.users.user", "guest");
+
+ setUpACLTest();
+
+ try
+ {
+ Connection conn = getConnection("guest", "guest");
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sesh.createConsumer(sesh.createQueue("example.RequestQueue"));
+
+ conn.close();
+ }
+ catch (JMSException e)
+ {
+ Throwable cause = e.getLinkedException();
+
+ assertNotNull("There was no liked exception", cause);
+ assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
+ assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ }
+ }
+
+ public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -142,6 +193,8 @@
public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -167,8 +220,10 @@
}
}
- public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException
+ public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -191,6 +246,8 @@
public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -212,8 +269,10 @@
}
}
- public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException
+ public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -239,8 +298,10 @@
}
}
- public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException
+ public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -271,6 +332,8 @@
public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -311,8 +374,10 @@
assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
}
- public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
+ public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -333,6 +398,8 @@
public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -358,6 +425,8 @@
public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -391,8 +460,10 @@
return (Connection) connection;
}
- public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException
+ public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -414,6 +485,8 @@
public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -436,6 +509,8 @@
public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -461,6 +536,8 @@
public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception
{
+ setUpACLTest();
+
Connection connection = null;
try
{
@@ -492,6 +569,8 @@
*/
public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
//Set up the Server
Connection serverConnection = getConnection("server", "guest");
@@ -572,6 +651,8 @@
public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java Sun Oct 11 23:22:08 2009
@@ -61,7 +61,7 @@
setupSession();
- _queue = _clientSession.createQueue(getName()+System.currentTimeMillis());
+ _queue = _clientSession.createQueue(getTestQueueName());
_clientSession.createConsumer(_queue).close();
//Ensure there are no messages on the queue to start with.
@@ -497,7 +497,7 @@
if (msgCount == failPoint)
{
- failBroker();
+ failBroker(getFailingPort());
}
}
@@ -529,7 +529,7 @@
sendMessages("connection2", messages);
}
- failBroker();
+ failBroker(getFailingPort());
checkQueueDepth(messages);
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java Sun Oct 11 23:22:08 2009
@@ -22,64 +22,172 @@
import org.apache.qpid.test.utils.*;
import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.ComparisonFailure;
+import junit.framework.AssertionFailedError;
/**
- * RollbackOrderTest
+ * RollbackOrderTest, QPID-1864, QPID-1871
+ *
+ * Description:
+ *
+ * The problem that this test is exposing is that the dispatcher used to be capable
+ * of holding on to a message when stopped. This ment that when the rollback was
+ * called and the dispatcher stopped it may have hold of a message. So after all
+ * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
+ * have been cleared the client still had a single message, the one the
+ * dispatcher was holding on to.
+ *
+ * As a result the TxRollback operation would run and then release the dispatcher.
+ * Whilst the dispatcher would then proceed to reject the message it was holiding
+ * the Broker would already have resent that message so the rejection would silently
+ * fail.
+ *
+ * And the client would receieve that single message 'early', depending on the
+ * number of messages already recevied when rollback was called.
+ *
+ *
+ * Aims:
+ *
+ * The tests puts 50 messages on to the queue.
+ *
+ * The test then tries to cause the dispatcher to stop whilst it is in the process
+ * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
+ *
+ * To exercise this path we have 50 message flowing to the client to give the
+ * dispatcher a bit of work to do moving messages.
+ *
+ * Then we loop - 10 times
+ * - Validating that the first message received is always message 1.
+ * - Receive a few more so that there are a few messages to reject.
+ * - call rollback, to try and catch the dispatcher mid process.
+ *
+ * Outcome:
+ *
+ * The hope is that we catch the dispatcher mid process and cause a BasicReject
+ * to fail. Which will be indicated in the log but will also cause that failed
+ * rejected message to be the next to be delivered which will not be message 1
+ * as expected.
+ *
+ * We are testing a race condition here but we can check through the log file if
+ * the race condition occured. However, performing that check will only validate
+ * the problem exists and will not be suitable as part of a system test.
*
*/
-
public class RollbackOrderTest extends QpidTestCase
{
- private Connection conn;
- private Queue queue;
- private Session ssn;
- private MessageProducer prod;
- private MessageConsumer cons;
+ private Connection _connection;
+ private Queue _queue;
+ private Session _session;
+ private MessageConsumer _consumer;
@Override public void setUp() throws Exception
{
super.setUp();
- conn = getConnection();
- conn.start();
- ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- queue = ssn.createQueue("rollback-order-test-queue");
- prod = ssn.createProducer(queue);
- cons = ssn.createConsumer(queue);
- for (int i = 0; i < 5; i++)
- {
- TextMessage msg = ssn.createTextMessage("message " + (i+1));
- prod.send(msg);
- }
- ssn.commit();
+ _connection = getConnection();
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue(getTestQueueName());
+ _consumer = _session.createConsumer(_queue);
+
+ //Send more messages so it is more likely that the dispatcher is
+ // processing on rollback.
+ sendMessage(_session, _queue, 50);
+ _session.commit();
+
}
public void testOrderingAfterRollback() throws Exception
{
- for (int i = 0; i < 10; i++)
+ //Start the session now so we
+ _connection.start();
+
+ for (int i = 0; i < 20; i++)
{
- TextMessage msg = (TextMessage) cons.receive();
- assertEquals("message 1", msg.getText());
- ssn.rollback();
+ Message msg = _consumer.receive();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ // Pull additional messages through so we have some reject work to do
+ for (int m=0; m < 5 ; m++)
+ {
+ _consumer.receive();
+ }
+
+ System.err.println("ROT-Rollback");
+ _logger.warn("ROT-Rollback");
+ _session.rollback();
}
}
- @Override public void tearDown() throws Exception
+ public void testOrderingAfterRollbackOnMessage() throws Exception
{
- while (true)
+ final CountDownLatch count= new CountDownLatch(20);
+ final Exception exceptions[] = new Exception[20];
+ final AtomicBoolean failed = new AtomicBoolean(false);
+
+ _consumer.setMessageListener(new MessageListener()
{
- Message msg = cons.receiveNoWait();
- if (msg == null)
+
+ public void onMessage(Message message)
{
- break;
+
+ Message msg = message;
+ try
+ {
+ count.countDown();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ _session.rollback();
+ }
+ catch (JMSException e)
+ {
+ System.out.println("Error:" + e.getMessage());
+ exceptions[(int)count.getCount()] = e;
+ }
+ catch (AssertionFailedError cf)
+ {
+ // End Test if Equality test fails
+ while (count.getCount() != 0)
+ {
+ count.countDown();
+ }
+
+ System.out.println("Error:" + cf.getMessage());
+ System.err.println(cf.getMessage());
+ cf.printStackTrace();
+ failed.set(true);
+ }
}
- else
+ });
+ //Start the session now so we
+ _connection.start();
+
+ count.await();
+
+ for (Exception e : exceptions)
+ {
+ if (e != null)
{
- msg.acknowledge();
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ failed.set(true);
}
}
- ssn.commit();
+
+// _consumer.close();
+ _connection.close();
+
+ assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
+ }
+
+ @Override public void tearDown() throws Exception
+ {
+
+ drainQueue(_queue);
+
super.tearDown();
}
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java Sun Oct 11 23:22:08 2009
@@ -37,7 +37,6 @@
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
@@ -58,13 +57,12 @@
private Session consumerSession;
private MessageConsumer consumer;
- private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
- private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
private int seed;
private Random rand;
-
+ private int _currentPort = getFailingPort();
+
@Override
protected void setUp() throws Exception
{
@@ -227,7 +225,7 @@
_logger.info("Failing over");
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
// Check that you produce and consume the rest of messages.
_logger.debug("==================");
@@ -242,10 +240,10 @@
_logger.debug("==================");
}
- private void causeFailure(long delay)
+ private void causeFailure(int port, long delay)
{
- failBroker();
+ failBroker(port);
_logger.info("Awaiting Failover completion");
try
@@ -268,7 +266,7 @@
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -314,7 +312,7 @@
long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
//Fail the first broker
- causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
//Reconnection should occur
assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
@@ -344,15 +342,15 @@
_logger.debug("===================================================================");
runP2PFailover(numMessages, false,false, false);
- startBroker(getFailingPort());
+ startBroker(_currentPort);
if (useAltPort)
{
- setFailingPort(altPort);
+ _currentPort = altPort;
useAltPort = false;
}
else
{
- setFailingPort(stdPort);
+ _currentPort = stdPort;
useAltPort = true;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org