You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/15 14:54:37 UTC
svn commit: r1214760 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/flow/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/server/queue/
broker/src/main/java/org/apache/qpid/server/subs...
Author: rgodfrey
Date: Thu Dec 15 13:54:36 2011
New Revision: 1214760
URL: http://svn.apache.org/viewvc?rev=1214760&view=rev
Log:
QPID-3687 : Improve Java Broker performance
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java Thu Dec 15 13:54:36 2011
@@ -45,6 +45,15 @@ public class WindowCreditManager extends
}
+ public long getBytesCreditLimit()
+ {
+ return _bytesCreditLimit;
+ }
+
+ public long getMessageCreditLimit()
+ {
+ return _messageCreditLimit;
+ }
public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Thu Dec 15 13:54:36 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -47,7 +48,6 @@ import org.apache.qpid.AMQConnectionExce
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -87,16 +87,20 @@ import org.apache.qpid.server.management
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.io.IoSender;
public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
@@ -139,7 +143,7 @@ public class AMQProtocolEngine implement
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
-
+ private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
@@ -173,6 +177,9 @@ public class AMQProtocolEngine implement
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private volatile boolean _deferFlush;
+ private long _lastReceivedTime;
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -240,14 +247,29 @@ public class AMQProtocolEngine implement
return _closing.get();
}
+ public synchronized void flushBatched()
+ {
+ _sender.flush();
+ }
+
+
+ public ClientDeliveryMethod createDeliveryMethod(int channelId)
+ {
+ return new WriteDeliverMethod(channelId);
+ }
+
public void received(final ByteBuffer msg)
{
- _lastIoTime = System.currentTimeMillis();
+ final long arrivalTime = System.currentTimeMillis();
+ _lastReceivedTime = arrivalTime;
+ _lastIoTime = arrivalTime;
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
+ final int len = dataBlocks.size();
+ for (int i = 0; i < len; i++)
{
+ AMQDataBlock dataBlock = dataBlocks.get(i);
try
{
dataBlockReceived(dataBlock);
@@ -347,7 +369,7 @@ public class AMQProtocolEngine implement
}
}
- private void protocolInitiationReceived(ProtocolInitiation pi)
+ private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
(_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
@@ -524,12 +546,15 @@ public class AMQProtocolEngine implement
*/
public synchronized void writeFrame(AMQDataBlock frame)
{
- _lastSent = frame;
+
final ByteBuffer buf = asByteBuffer(frame);
- _lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
- _sender.flush();
+ _lastIoTime = System.currentTimeMillis();
+ if(!_deferFlush)
+ {
+ _sender.flush();
+ }
}
public AMQShortString getContextKey()
@@ -918,7 +943,7 @@ public class AMQProtocolEngine implement
private void setProtocolVersion(ProtocolVersion pv)
{
_protocolVersion = pv;
-
+ _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
_protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
_dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
}
@@ -1023,7 +1048,7 @@ public class AMQProtocolEngine implement
public MethodRegistry getMethodRegistry()
{
- return MethodRegistry.getMethodRegistry(getProtocolVersion());
+ return _methodRegistry;
}
public MethodDispatcher getMethodDispatcher()
@@ -1052,7 +1077,7 @@ public class AMQProtocolEngine implement
// Nothing
}
- public void writerIdle()
+ public synchronized void writerIdle()
{
_sender.send(asByteBuffer(HeartbeatBody.FRAME));
}
@@ -1109,6 +1134,11 @@ public class AMQProtocolEngine implement
return _lastIoTime;
}
+ public long getLastReceivedTime()
+ {
+ return _lastReceivedTime;
+ }
+
public ProtocolSessionIdentifier getSessionIdentifier()
{
return _sessionIdentifier;
@@ -1402,9 +1432,215 @@ public class AMQProtocolEngine implement
return true;
}
+ public void setDeferFlush(boolean deferFlush)
+ {
+ _deferFlush = deferFlush;
+ }
+
+
+
@Override
public String getUserName()
{
return getAuthorizedPrincipal().getName();
}
+
+ private static class ByteBufferOutputStream extends OutputStream
+ {
+
+
+ private final ByteBuffer _buf;
+
+ public ByteBufferOutputStream(ByteBuffer buf)
+ {
+ _buf = buf;
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ _buf.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ _buf.put(b, off, len);
+ }
+ }
+
+ public final class WriteDeliverMethod
+ implements ClientDeliveryMethod
+ {
+ private final int _channelId;
+
+ public WriteDeliverMethod(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ registerMessageDelivered(entry.getMessage().getSize());
+ _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, sub.getConsumerTag());
+ entry.incrementDeliveryCount();
+ }
+
+ }
+
+ private static class BytesDataOutput implements DataOutput
+ {
+ int _pos = 0;
+ byte[] _buf;
+
+ public BytesDataOutput(byte[] buf)
+ {
+ _buf = buf;
+ }
+
+ public void setBuffer(byte[] buf)
+ {
+ _buf = buf;
+ _pos = 0;
+ }
+
+ public void reset()
+ {
+ _pos = 0;
+ }
+
+ public int length()
+ {
+ return _pos;
+ }
+
+ public void write(int b)
+ {
+ _buf[_pos++] = (byte) b;
+ }
+
+ public void write(byte[] b)
+ {
+ System.arraycopy(b, 0, _buf, _pos, b.length);
+ _pos+=b.length;
+ }
+
+
+ public void write(byte[] b, int off, int len)
+ {
+ System.arraycopy(b, off, _buf, _pos, len);
+ _pos+=len;
+
+ }
+
+ public void writeBoolean(boolean v)
+ {
+ _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+ }
+
+ public void writeByte(int v)
+ {
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeShort(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeChar(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeInt(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeLong(long v)
+ {
+ _buf[_pos++] = (byte) (v >>> 56);
+ _buf[_pos++] = (byte) (v >>> 48);
+ _buf[_pos++] = (byte) (v >>> 40);
+ _buf[_pos++] = (byte) (v >>> 32);
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte)v;
+ }
+
+ public void writeFloat(float v)
+ {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v)
+ {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ _buf[_pos++] = ((byte)s.charAt(i));
+ }
+ }
+
+ public void writeChars(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ int v = s.charAt(i);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+ }
+
+ public void writeUTF(String s)
+ {
+ int strlen = s.length();
+
+ int pos = _pos;
+ _pos+=2;
+
+
+ for (int i = 0; i < strlen; i++)
+ {
+ int c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
+ {
+ c = s.charAt(i);
+ _buf[_pos++] = (byte) c;
+
+ }
+ else if (c > 0x07FF)
+ {
+ _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ else
+ {
+ _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ }
+
+ int len = _pos - (pos + 2);
+
+ _buf[pos++] = (byte) (len >>> 8);
+ _buf[pos] = (byte) len;
+ }
+
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Dec 15 13:54:36 2011
@@ -32,6 +32,7 @@ import org.apache.qpid.server.AMQChannel
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -49,6 +50,14 @@ public interface AMQProtocolSession exte
boolean isClosing();
+ void flushBatched();
+
+ void setDeferFlush(boolean defer);
+
+ ClientDeliveryMethod createDeliveryMethod(int channelId);
+
+ long getLastReceivedTime();
+
public static final class ProtocolSessionIdentifier
{
private final Object _sessionIdentifier;
@@ -77,15 +86,6 @@ public interface AMQProtocolSession exte
}
/**
- * Called when a protocol data block is received
- *
- * @param message the data block that has been received
- *
- * @throws Exception if processing the datablock fails
- */
- void dataBlockReceived(AMQDataBlock message) throws Exception;
-
- /**
* Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
* 6).
*
@@ -234,4 +234,5 @@ public interface AMQProtocolSession exte
List<AMQChannel> getChannels();
void mgmtCloseChannel(int channelId);
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Thu Dec 15 13:54:36 2011
@@ -100,6 +100,12 @@ public class ProtocolEngine_0_10 extend
return _network.getLocalAddress();
}
+ public void received(final ByteBuffer buf)
+ {
+ super.received(buf);
+ _connection.receivedComplete();
+ }
+
public long getReadBytes()
{
return _readBytes;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Thu Dec 15 13:54:36 2011
@@ -27,6 +27,11 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.queue.QueueRunner;
import org.apache.qpid.server.queue.SimpleAMQQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* QueueRunners are Runnables used to process a queue when requiring
* asynchronous message delivery to subscriptions, which is necessary
@@ -37,33 +42,64 @@ public class QueueRunner implements Read
{
private static final Logger _logger = Logger.getLogger(QueueRunner.class);
- private final String _name;
private final SimpleAMQQueue _queue;
- public QueueRunner(SimpleAMQQueue queue, long count)
+ private static int IDLE = 0;
+ private static int SCHEDULED = 1;
+ private static int RUNNING = 2;
+
+
+ private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
+
+ private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+ private final AtomicBoolean _stateChange = new AtomicBoolean();
+
+ private final AtomicLong _lastRunAgain = new AtomicLong();
+ private final AtomicLong _lastRunTime = new AtomicLong();
+
+ private long _runs;
+ private long _continues;
+
+ public QueueRunner(SimpleAMQQueue queue)
{
_queue = queue;
- _name = "QueueRunner-" + count + "-" + queue.getLogActor();
}
+ private int trouble = 0;
+
public void run()
{
- String originalName = Thread.currentThread().getName();
- try
+ if(_scheduled.compareAndSet(SCHEDULED,RUNNING))
{
- Thread.currentThread().setName(_name);
- CurrentActor.set(_queue.getLogActor());
+ long runAgain = Long.MIN_VALUE;
+ _stateChange.set(false);
+ try
+ {
+ CurrentActor.set(_queue.getLogActor());
+
+ runAgain = _queue.processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Exception during asynchronous delivery by " + toString(), e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ long stateChangeCount = _queue.getStateChangeCount();
+ _lastRunAgain.set(runAgain);
+ _lastRunTime.set(System.nanoTime());
+ if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
+ {
+ _continues++;
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ _queue.execute(this);
+ }
+ }
- _queue.processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error("Exception during asynchronous delivery by " + _name, e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
}
}
@@ -79,6 +115,21 @@ public class QueueRunner implements Read
public String toString()
{
- return _name;
+ return "QueueRunner-" + _queue.getLogActor();
+ }
+
+ public void execute(Executor executor)
+ {
+ _stateChange.set(true);
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ executor.execute(this);
+ }
+ }
+
+ public boolean isIdle()
+ {
+ return _scheduled.get() == IDLE;
}
-}
\ No newline at end of file
+
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Dec 15 13:54:36 2011
@@ -155,11 +155,11 @@ public class SimpleAMQQueue implements A
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
- static final int MAX_ASYNC_DELIVERIES = 10;
+ static final int MAX_ASYNC_DELIVERIES = 80;
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
- private AtomicReference<Runnable> _asynchronousRunner = new AtomicReference<Runnable>(null);
+
private final Executor _asyncDelivery;
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
@@ -584,33 +584,20 @@ public class SimpleAMQQueue implements A
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
+
incrementTxnEnqueueStats(message);
incrementQueueCount();
incrementQueueSize(message);
+
_totalMessagesReceived.incrementAndGet();
QueueEntry entry;
- Subscription exclusiveSub = _exclusiveSubscriber;
+ final Subscription exclusiveSub = _exclusiveSubscriber;
+ entry = _entries.add(message);
- if (exclusiveSub != null)
+ if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
- exclusiveSub.getSendLock();
-
- try
- {
- entry = _entries.add(message);
-
- deliverToSubscription(exclusiveSub, entry);
- }
- finally
- {
- exclusiveSub.releaseSendLock();
- }
- }
- else
- {
- entry = _entries.add(message);
/*
iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
@@ -666,7 +653,14 @@ public class SimpleAMQQueue implements A
{
checkSubscriptionsNotAheadOfDelivery(entry);
- deliverAsync();
+ if (exclusiveSub != null)
+ {
+ deliverAsync(exclusiveSub);
+ }
+ else
+ {
+ deliverAsync();
+ }
}
if(_managedObject != null)
@@ -685,30 +679,32 @@ public class SimpleAMQQueue implements A
throws AMQException
{
- sub.getSendLock();
- try
+ if(sub.trySendLock())
{
- if (subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
+ try
{
- if (!sub.wouldSuspend(entry))
+ if (subscriptionReadyAndHasInterest(sub, entry)
+ && !sub.isSuspended())
{
- if (sub.acquires() && !entry.acquire(sub))
- {
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
- sub.restoreCredit(entry);
- }
- else
+ if (!sub.wouldSuspend(entry))
{
- deliverMessage(sub, entry);
+ if (sub.acquires() && !entry.acquire(sub))
+ {
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ deliverMessage(sub, entry, false);
+ }
}
}
}
- }
- finally
- {
- sub.releaseSendLock();
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
}
@@ -752,7 +748,7 @@ public class SimpleAMQQueue implements A
_byteTxnDequeues.addAndGet(entry.getSize());
}
- private void deliverMessage(final Subscription sub, final QueueEntry entry)
+ private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -760,7 +756,7 @@ public class SimpleAMQQueue implements A
_deliveredMessages.incrementAndGet();
incrementUnackedMsgCount();
- sub.send(entry);
+ sub.send(entry, batch);
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
@@ -866,7 +862,7 @@ public class SimpleAMQQueue implements A
{
if (!subscription.isClosed())
{
- deliverMessage(subscription, entry);
+ deliverMessage(subscription, entry, false);
return true;
}
else
@@ -1008,6 +1004,12 @@ public class SimpleAMQQueue implements A
_exclusiveSubscriber = exclusiveSubscriber;
}
+ long getStateChangeCount()
+ {
+ return _stateChangeCount.get();
+ }
+
+
public static interface QueueEntryFilter
{
public boolean accept(QueueEntry entry);
@@ -1308,7 +1310,7 @@ public class SimpleAMQQueue implements A
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
- ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
while (queueListIterator.advance())
{
@@ -1331,7 +1333,7 @@ public class SimpleAMQQueue implements A
private void dequeueEntry(final QueueEntry node)
{
- ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
dequeueEntry(node, txn);
}
@@ -1408,7 +1410,7 @@ public class SimpleAMQQueue implements A
}
});
- ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
if(_alternateExchange != null)
{
@@ -1577,26 +1579,34 @@ public class SimpleAMQQueue implements A
}
}
+ private QueueRunner _queueRunner = new QueueRunner(this);
public void deliverAsync()
{
- QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
+ _stateChangeCount.incrementAndGet();
+
+ _queueRunner.execute(_asyncDelivery);
- if (_asynchronousRunner.compareAndSet(null, runner))
- {
- _asyncDelivery.execute(runner);
- }
}
public void deliverAsync(Subscription sub)
{
- SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
- if(flusher == null)
+ //_stateChangeCount.incrementAndGet();
+ if(_exclusiveSubscriber == null)
{
- flusher = new SubFlushRunner(sub);
- sub.set(SUB_FLUSH_RUNNER, flusher);
+ deliverAsync();
}
- _asyncDelivery.execute(flusher);
+ else
+ {
+ SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
+ if(flusher == null)
+ {
+ flusher = new SubFlushRunner(sub);
+ sub.set(SUB_FLUSH_RUNNER, flusher);
+ }
+ flusher.execute(_asyncDelivery);
+ }
+
}
public void flushSubscription(Subscription sub) throws AMQException
@@ -1612,31 +1622,56 @@ public class SimpleAMQQueue implements A
public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
{
boolean atTail = false;
+ final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
- while (!sub.isSuspended() && !atTail && iterations != 0)
+ try
{
- try
+ if(keepSendLockHeld)
{
sub.getSendLock();
- atTail = attemptDelivery(sub);
- if (atTail && sub.isAutoClose())
+ }
+ while (!sub.isSuspended() && !atTail && iterations != 0)
+ {
+ try
{
- unregisterSubscription(sub);
+ if(!keepSendLockHeld)
+ {
+ sub.getSendLock();
+ }
- sub.confirmAutoClose();
+ atTail = attemptDelivery(sub, true);
+ if (atTail && !sub.isSuspended() && sub.isAutoClose())
+ {
+ unregisterSubscription(sub);
+
+ sub.confirmAutoClose();
+ }
+ else if (!atTail)
+ {
+ iterations--;
+ }
}
- else if (!atTail)
+ finally
{
- iterations--;
+ if(!keepSendLockHeld)
+ {
+ sub.releaseSendLock();
+ }
}
}
- finally
+ }
+ finally
+ {
+ if(keepSendLockHeld)
{
sub.releaseSendLock();
}
+ sub.flushBatched();
+
}
+
// if there's (potentially) more than one subscription the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
@@ -1653,11 +1688,13 @@ public class SimpleAMQQueue implements A
*
* Looks up the next node for the subscription and attempts to deliver it.
*
+ *
* @param sub
+ * @param batch
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(Subscription sub) throws AMQException
+ private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1681,7 +1718,7 @@ public class SimpleAMQQueue implements A
}
else
{
- deliverMessage(sub, node);
+ deliverMessage(sub, node, batch);
}
}
@@ -1785,23 +1822,26 @@ public class SimpleAMQQueue implements A
* @param runner the Runner to schedule
* @throws AMQException
*/
- public void processQueue(QueueRunner runner) throws AMQException
+ public long processQueue(QueueRunner runner) throws AMQException
{
- long stateChangeCount;
+ long stateChangeCount = Long.MIN_VALUE;
long previousStateChangeCount = Long.MIN_VALUE;
+ long rVal = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
- _asynchronousRunner.compareAndSet(runner, null);
+ final int numSubs = _subscriptionList.size();
+
+ final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
// For every message enqueue/requeue the we fire deliveryAsync() which
// increases _stateChangeCount. If _sCC changes whilst we are in our loop
// (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
// then we will continue to run for a maximum of iterations.
// So whilst delivery/rejection is going on a processQueue thread will be running
- while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
+ while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
// further, just in case the advance of one subscription in the last loop allows a different subscription to
@@ -1812,6 +1852,7 @@ public class SimpleAMQQueue implements A
//further asynchronous delivery is required since the
//previous loop. keep going if iteration slicing allows.
lastLoop = false;
+ rVal = stateChangeCount;
}
previousStateChangeCount = stateChangeCount;
@@ -1824,33 +1865,47 @@ public class SimpleAMQQueue implements A
{
Subscription sub = subscriptionIter.getNode().getSubscription();
sub.getSendLock();
- try
- {
- //attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub);
- if (subscriptionDone)
+
+ try
{
- //close autoClose subscriptions if we are not currently intent on continuing
- if (lastLoop && sub.isAutoClose())
+ for(int i = 0 ; i < perSub; i++)
{
- unregisterSubscription(sub);
+ //attempt delivery. returns true if no further delivery currently possible to this sub
+ subscriptionDone = attemptDelivery(sub, true);
+ if (subscriptionDone)
+ {
+ sub.flushBatched();
+ //close autoClose subscriptions if we are not currently intent on continuing
+ if (lastLoop && !sub.isSuspended() && sub.isAutoClose())
+ {
+
+ unregisterSubscription(sub);
+
+ sub.confirmAutoClose();
+ }
+ break;
+ }
+ else
+ {
+ //this subscription can accept additional deliveries, so we must
+ //keep going after this (if iteration slicing allows it)
+ allSubscriptionsDone = false;
+ lastLoop = false;
+ if(--iterations == 0)
+ {
+ sub.flushBatched();
+ break;
+ }
+ }
- sub.confirmAutoClose();
}
+
+ sub.flushBatched();
}
- else
+ finally
{
- //this subscription can accept additional deliveries, so we must
- //keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
- lastLoop = false;
- iterations--;
+ sub.releaseSendLock();
}
- }
- finally
- {
- sub.releaseSendLock();
- }
}
if(allSubscriptionsDone && lastLoop)
@@ -1876,24 +1931,24 @@ public class SimpleAMQQueue implements A
deliveryIncomplete = true;
}
- _asynchronousRunner.set(null);
}
// If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
- if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
+ if (iterations == 0)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Rescheduling runner:" + runner);
}
- _asyncDelivery.execute(runner);
+ return 0L;
}
+ return rVal;
+
}
public void checkMessageStatus() throws AMQException
{
-
QueueEntryIterator queueListIterator = _entries.iterator();
while (queueListIterator.advance())
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Thu Dec 15 13:54:36 2011
@@ -27,6 +27,10 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
class SubFlushRunner implements ReadWriteRunnable
{
@@ -34,29 +38,33 @@ class SubFlushRunner implements ReadWrit
private final Subscription _sub;
- private final String _name;
+
+ private static int IDLE = 0;
+ private static int SCHEDULED = 1;
+ private static int RUNNING = 2;
+
+
+ private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
+
+
private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+ private final AtomicBoolean _stateChange = new AtomicBoolean();
public SubFlushRunner(Subscription sub)
{
_sub = sub;
- _name = "SubFlushRunner-"+_sub;
}
public void run()
{
-
- String originalName = Thread.currentThread().getName();
- try
+ if(_scheduled.compareAndSet(SCHEDULED, RUNNING))
{
- Thread.currentThread().setName(_name);
-
boolean complete = false;
+ _stateChange.set(false);
try
{
CurrentActor.set(_sub.getLogActor());
complete = getQueue().flushSubscription(_sub, ITERATIONS);
-
}
catch (AMQException e)
{
@@ -66,17 +74,15 @@ class SubFlushRunner implements ReadWrit
{
CurrentActor.remove();
}
- if (!complete && !_sub.isSuspended())
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
{
- getQueue().execute(this);
+ if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ {
+ getQueue().execute(this);
+ }
}
-
}
- finally
- {
- Thread.currentThread().setName(originalName);
- }
-
}
private SimpleAMQQueue getQueue()
@@ -93,4 +99,18 @@ class SubFlushRunner implements ReadWrit
{
return true;
}
+
+ public String toString()
+ {
+ return "SubFlushRunner-" + _sub.getLogActor();
+ }
+
+ public void execute(Executor executor)
+ {
+ _stateChange.set(true);
+ if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ {
+ executor.execute(this);
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Thu Dec 15 13:54:36 2011
@@ -73,13 +73,18 @@ public interface Subscription
void close();
- void send(QueueEntry msg) throws AMQException;
+ void send(QueueEntry entry, boolean batch) throws AMQException;
+
+ void flushBatched();
void queueDeleted(AMQQueue queue);
boolean wouldSuspend(QueueEntry msg);
+ boolean trySendLock();
+
+
void getSendLock();
void releaseSendLock();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Thu Dec 15 13:54:36 2011
@@ -119,11 +119,13 @@ public abstract class SubscriptionImpl i
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
- * @param msg The message to send
+ *
+ * @param entry
+ * @param batch
* @throws AMQException
*/
@Override
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -131,7 +133,7 @@ public abstract class SubscriptionImpl i
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(msg, deliveryTag);
+ sendToClient(entry, deliveryTag);
}
}
@@ -173,11 +175,13 @@ public abstract class SubscriptionImpl i
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
+ *
* @param entry The message to send
+ * @param batch
* @throws AMQException
*/
@Override
- public void send(QueueEntry entry) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -193,6 +197,7 @@ public abstract class SubscriptionImpl i
synchronized (getChannel())
{
+ getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
sendToClient(entry, deliveryTag);
@@ -263,11 +268,13 @@ public abstract class SubscriptionImpl i
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
+ *
* @param entry The message to send
+ * @param batch
* @throws AMQException
*/
@Override
- public void send(QueueEntry entry) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
// if we do not need to wait for client acknowledgements
@@ -282,6 +289,7 @@ public abstract class SubscriptionImpl i
synchronized (getChannel())
{
+ getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
@@ -441,10 +449,12 @@ public abstract class SubscriptionImpl i
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
- * @param msg The message to send
+ *
+ * @param entry
+ * @param batch
* @throws AMQException
*/
- abstract public void send(QueueEntry msg) throws AMQException;
+ abstract public void send(QueueEntry entry, boolean batch) throws AMQException;
public boolean isSuspended()
@@ -578,6 +588,11 @@ public abstract class SubscriptionImpl i
return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage());
}
+ public boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
public void getSendLock()
{
_stateChangeLock.lock();
@@ -814,4 +829,11 @@ public abstract class SubscriptionImpl i
{
return _createTime;
}
+
+ public void flushBatched()
+ {
+ _channel.getProtocolSession().setDeferFlush(false);
+
+ _channel.getProtocolSession().flushBatched();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Thu Dec 15 13:54:36 2011
@@ -61,6 +61,7 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.framing.AMQShortString;
@@ -91,6 +92,8 @@ public class Subscription_0_10 implement
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+ private static final Option[] BATCHED = new Option[] { Option.BATCH };
+
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
@@ -127,6 +130,8 @@ public class Subscription_0_10 implement
private final long _createTime = System.currentTimeMillis();
private final AtomicLong _deliveredCount = new AtomicLong(0);
private final Map<String, Object> _arguments;
+ private int _deferredMessageCredit;
+ private long _deferredSizeCredit;
public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
@@ -137,6 +142,7 @@ public class Subscription_0_10 implement
{
_subscriptionID = subscriptionId;
_session = session;
+ _postIdSettingAction = new AddMessageDispositionListenerAction(session);
_destination = destination;
_acceptMode = acceptMode;
_acquireMode = acquireMode;
@@ -325,10 +331,26 @@ public class Subscription_0_10 implement
}
- private class AddMessageDispositionListnerAction implements Runnable
+ public static class AddMessageDispositionListenerAction implements Runnable
{
- public MessageTransfer _xfr;
- public ServerSession.MessageDispositionChangeListener _action;
+ private MessageTransfer _xfr;
+ private ServerSession.MessageDispositionChangeListener _action;
+ private ServerSession _session;
+
+ public AddMessageDispositionListenerAction(ServerSession session)
+ {
+ _session = session;
+ }
+
+ public void setXfr(MessageTransfer xfr)
+ {
+ _xfr = xfr;
+ }
+
+ public void setAction(ServerSession.MessageDispositionChangeListener action)
+ {
+ _action = action;
+ }
public void run()
{
@@ -339,9 +361,9 @@ public class Subscription_0_10 implement
}
}
- private final AddMessageDispositionListnerAction _postIdSettingAction = new AddMessageDispositionListnerAction();
+ private final AddMessageDispositionListenerAction _postIdSettingAction;
- public void send(final QueueEntry entry) throws AMQException
+ public void send(final QueueEntry entry, boolean batch) throws AMQException
{
ServerMessage serverMsg = entry.getMessage();
@@ -586,26 +608,27 @@ public class Subscription_0_10 implement
{
public void onComplete(Method method)
{
- restoreCredit(entry);
+ deferredAddCredit(1, entry.getSize());
}
});
}
- _postIdSettingAction._xfr = xfr;
+ _postIdSettingAction.setXfr(xfr);
if(_acceptMode == MessageAcceptMode.EXPLICIT)
{
- _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
+ _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
}
else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
- _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
+ _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
}
else
{
- _postIdSettingAction._action = null;
+ _postIdSettingAction.setAction(null);
}
+
_session.sendMessage(xfr, _postIdSettingAction);
entry.incrementDeliveryCount();
_deliveredCount.incrementAndGet();
@@ -723,6 +746,11 @@ public class Subscription_0_10 implement
return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
}
+ public boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
public void getSendLock()
{
_stateChangeLock.lock();
@@ -788,6 +816,28 @@ public class Subscription_0_10 implement
return _properties.get(key);
}
+ private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
+ {
+ _deferredMessageCredit += deferredMessageCredit;
+ _deferredSizeCredit += deferredSizeCredit;
+
+ }
+
+ public void flushCreditState()
+ {
+ flushCreditState(false);
+ }
+ public void flushCreditState(boolean strict)
+ {
+ if(strict || !isSuspended() || _deferredMessageCredit >= 200
+ || !(_creditManager instanceof WindowCreditManager)
+ || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+ {
+ _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+ _deferredMessageCredit = 0;
+ _deferredSizeCredit = 0l;
+ }
+ }
public FlowCreditManager_0_10 getCreditManager()
{
@@ -890,6 +940,7 @@ public class Subscription_0_10 implement
public void flush() throws AMQException
{
+ flushCreditState(true);
_queue.flushSubscription(this);
stop();
}
@@ -1029,4 +1080,9 @@ public class Subscription_0_10 implement
return (LogSubject) this;
}
+
+ public void flushBatched()
+ {
+ _session.getConnection().flush();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Thu Dec 15 13:54:36 2011
@@ -446,6 +446,14 @@ public class ServerConnection extends Co
}
}
+ public void receivedComplete()
+ {
+ for (Session ssn : getChannels())
+ {
+ ((ServerSession)ssn).flushCreditState();
+ }
+ }
+
@Override
public ManagedObject getManagedObject()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Dec 15 13:54:36 2011
@@ -697,6 +697,15 @@ public class ServerSession extends Sessi
}
}
+ public void flushCreditState()
+ {
+ final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+ for (Subscription_0_10 subscription_0_10 : subscriptions)
+ {
+ subscription_0_10.flushCreditState(false);
+ }
+ }
+
public int getUnacknowledgedMessageCount()
{
return _messageDispositionListenerMap.size();
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Thu Dec 15 13:54:36 2011
@@ -43,6 +43,8 @@ import org.apache.qpid.server.output.Pro
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TestNetworkConnection;
@@ -120,6 +122,11 @@ public class InternalTestProtocolSession
{
}
+ public ClientDeliveryMethod createDeliveryMethod(int channelId)
+ {
+ return new InternalWriteDeliverMethod(channelId);
+ }
+
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
}
@@ -213,4 +220,42 @@ public class InternalTestProtocolSession
((AMQChannel)session).getProtocolSession().closeSession();
}
+
+ private class InternalWriteDeliverMethod implements ClientDeliveryMethod
+ {
+ private int _channelId;
+
+ public InternalWriteDeliverMethod(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+
+ @Override
+ public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException
+ {
+ _deliveryCount.incrementAndGet();
+
+ synchronized (_channelDelivers)
+ {
+ Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+
+ if (consumers == null)
+ {
+ consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ _channelDelivers.put(_channelId, consumers);
+ }
+
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getConsumerTag());
+
+ if (consumerDelivers == null)
+ {
+ consumerDelivers = new LinkedList<DeliveryPair>();
+ consumers.put(sub.getConsumerTag(), consumerDelivers);
+ }
+
+ consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+ }
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Thu Dec 15 13:54:36 2011
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstracti
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
@@ -143,16 +144,19 @@ public class AckTest extends InternalBro
qs.add(_queue);
msg.enqueue(qs);
MessageMetaData mmd = msg.headersReceived();
- msg.setStoredMessage(_messageStore.addMessage(mmd));
+ final StoredMessage storedMessage = _messageStore.addMessage(mmd);
+ msg.setStoredMessage(storedMessage);
+ final AMQMessage message = new AMQMessage(storedMessage);
if(msg.allContentReceived())
{
ServerTransaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, msg, new ServerTransaction.Action() {
+ txn.enqueue(_queue, message, new ServerTransaction.Action() {
public void postCommit()
{
try
{
- _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
+
+ _queue.enqueue(message);
}
catch (AMQException e)
{
@@ -170,6 +174,15 @@ public class AckTest extends InternalBro
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
}
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
}
/**
@@ -181,9 +194,8 @@ public class AckTest extends InternalBro
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount, true);
-
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertEquals("",msgCount,map.size());
+ assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -206,7 +218,6 @@ public class AckTest extends InternalBro
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
final int msgCount = 10;
publishMessages(msgCount);
-
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
assertTrue(_messageStore.getMessageCount() == 0);
@@ -243,7 +254,7 @@ public class AckTest extends InternalBro
_channel.acknowledgeMessage(5, false);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount - 1);
+ assertEquals("Map not expected size",msgCount - 1,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -270,6 +281,8 @@ public class AckTest extends InternalBro
final int msgCount = 10;
publishMessages(msgCount);
+
+
_channel.acknowledgeMessage(5, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Thu Dec 15 13:54:36 2011
@@ -44,7 +44,6 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
@@ -189,6 +188,13 @@ public class SimpleAMQQueueTest extends
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch(InterruptedException e)
+ {
+ }
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry);
@@ -430,6 +436,13 @@ public class SimpleAMQQueueTest extends
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ }
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
@@ -723,7 +736,7 @@ public class SimpleAMQQueueTest extends
assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
// call processQueue to deliver the messages
- testQueue.processQueue(new QueueRunner(testQueue, 1)
+ testQueue.processQueue(new QueueRunner(testQueue)
{
@Override
public void run()
@@ -826,7 +839,7 @@ public class SimpleAMQQueueTest extends
/**
* Tests that dequeued message is not copied as part of invocation of
- * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)}
+ * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)}
*/
public void testCopyMessagesWithDequeuedEntry()
{
@@ -844,7 +857,7 @@ public class SimpleAMQQueueTest extends
SimpleAMQQueue queue = createQueue(anotherQueueName);
// create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
// copy messages into another queue
_queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
@@ -876,7 +889,7 @@ public class SimpleAMQQueueTest extends
/**
* Tests that dequeued message is not moved as part of invocation of
- * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)}
+ * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)}
*/
public void testMovedMessagesWithDequeuedEntry()
{
@@ -894,7 +907,7 @@ public class SimpleAMQQueueTest extends
SimpleAMQQueue queue = createQueue(anotherQueueName);
// create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
// move messages into another queue
_queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
@@ -927,7 +940,7 @@ public class SimpleAMQQueueTest extends
/**
* Tests that messages in given range including dequeued one are deleted
* from the queue on invocation of
- * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)}
+ * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)}
*/
public void testRemoveMessagesFromQueueWithDequeuedEntry()
{
@@ -954,7 +967,7 @@ public class SimpleAMQQueueTest extends
/**
* Tests that dequeued message on the top is not accounted and next message
* is deleted from the queue on invocation of
- * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)}
+ * {@link SimpleAMQQueue#deleteMessageFromTop()}
*/
public void testDeleteMessageFromTopWithDequeuedEntryOnTop()
{
@@ -983,7 +996,7 @@ public class SimpleAMQQueueTest extends
/**
* Tests that all messages including dequeued one are deleted from the queue
- * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)}
+ * on invocation of {@link SimpleAMQQueue#clearQueue()}
*/
public void testClearQueueWithDequeuedEntry()
{
@@ -1049,10 +1062,12 @@ public class SimpleAMQQueueTest extends
{
/**
* Send a message and decrement latch
+ * @param entry
+ * @param batch
*/
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
- super.send(msg);
+ super.send(entry, batch);
latch.countDown();
}
};
@@ -1063,7 +1078,7 @@ public class SimpleAMQQueueTest extends
testQueue.registerSubscription(subscription, false);
// process queue
- testQueue.processQueue(new QueueRunner(testQueue, 1)
+ testQueue.processQueue(new QueueRunner(testQueue)
{
public void run()
{
@@ -1127,6 +1142,19 @@ public class SimpleAMQQueueTest extends
{
return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0);
}
+
+ @Override
+ public boolean acquire(Subscription sub)
+ {
+ if(((AMQMessage) message).getMessageId().longValue() % 2 == 0)
+ {
+ return false;
+ }
+ else
+ {
+ return super.acquire(sub);
+ }
+ }
};
}
};
@@ -1243,6 +1271,14 @@ public class SimpleAMQQueueTest extends
fail("Failure to put message on queue:" + e.getMessage());
}
}
+ try
+ {
+ Thread.sleep(2000L);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
}
/**
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Thu Dec 15 13:54:36 2011
@@ -125,6 +125,12 @@ public class MockSubscription implements
return queue;
}
+ public boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+
public void getSendLock()
{
_stateChangeLock.lock();
@@ -216,7 +222,7 @@ public class MockSubscription implements
{
}
- public void send(QueueEntry entry) throws AMQException
+ public void send(QueueEntry entry, boolean batch) throws AMQException
{
if (messages.contains(entry))
{
@@ -225,6 +231,12 @@ public class MockSubscription implements
messages.add(entry);
}
+ @Override
+ public void flushBatched()
+ {
+
+ }
+
public void setQueueContext(AMQQueue.Context queueContext)
{
_queueContext = queueContext;
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Thu Dec 15 13:54:36 2011
@@ -56,7 +56,7 @@ public class FailoverBehaviourTest exten
private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
/** Default number of messages to send before failover */
- private static final int DEFAULT_NUMBER_OF_MESSAGES = 10;
+ private static final int DEFAULT_NUMBER_OF_MESSAGES = 40;
/** Actual number of messages to send before failover */
protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);
@@ -1157,7 +1157,6 @@ public class FailoverBehaviourTest exten
{
init(acknowledgeMode, false);
_consumer.close();
- QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
_connection.start();
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
@@ -1165,6 +1164,8 @@ public class FailoverBehaviourTest exten
{
_producerSession.commit();
}
+
+ QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
return browser;
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=1214760&r1=1214759&r2=1214760&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Thu Dec 15 13:54:36 2011
@@ -54,7 +54,14 @@ public class AMQConnectionTest extends Q
_topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic"));
_queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue"));
}
-
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
protected void createConnection() throws Exception
{
_connection = (AMQConnection) getConnection("guest", "guest");
@@ -67,16 +74,27 @@ public class AMQConnectionTest extends Q
public void testCreateQueueSession() throws JMSException
{
- _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE);
+ createQueueSession();
+ }
+
+ private void createQueueSession() throws JMSException
+ {
+ _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE);
}
public void testCreateTopicSession() throws JMSException
{
+ createTopicSession();
+ }
+
+ private void createTopicSession() throws JMSException
+ {
_topicSession = _connection.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
}
public void testTopicSessionCreateBrowser() throws JMSException
{
+ createTopicSession();
try
{
_topicSession.createBrowser(_queue);
@@ -94,6 +112,7 @@ public class AMQConnectionTest extends Q
public void testTopicSessionCreateQueue() throws JMSException
{
+ createTopicSession();
try
{
_topicSession.createQueue("abc");
@@ -111,6 +130,7 @@ public class AMQConnectionTest extends Q
public void testTopicSessionCreateTemporaryQueue() throws JMSException
{
+ createTopicSession();
try
{
_topicSession.createTemporaryQueue();
@@ -128,6 +148,7 @@ public class AMQConnectionTest extends Q
public void testQueueSessionCreateTemporaryTopic() throws JMSException
{
+ createQueueSession();
try
{
_queueSession.createTemporaryTopic();
@@ -145,6 +166,7 @@ public class AMQConnectionTest extends Q
public void testQueueSessionCreateTopic() throws JMSException
{
+ createQueueSession();
try
{
_queueSession.createTopic("abc");
@@ -162,6 +184,7 @@ public class AMQConnectionTest extends Q
public void testQueueSessionDurableSubscriber() throws JMSException
{
+ createQueueSession();
try
{
_queueSession.createDurableSubscriber(_topic, "abc");
@@ -179,6 +202,7 @@ public class AMQConnectionTest extends Q
public void testQueueSessionUnsubscribe() throws JMSException
{
+ createQueueSession();
try
{
_queueSession.unsubscribe("abc");
@@ -243,25 +267,6 @@ public class AMQConnectionTest extends Q
assertNotNull("Consumer B should have received the message",msg);
}
- public void testGetChannelID() throws Exception
- {
- long maxChannelID = _connection.getMaximumChannelCount();
- if (isBroker010())
- {
- //Usable numbers are 0 to N-1 when using 0-10
- //and 1 to N for 0-8/0-9
- maxChannelID = maxChannelID-1;
- }
- for (int j = 0; j < 3; j++)
- {
- int i = isBroker010() ? 0 : 1;
- for ( ; i <= maxChannelID; i++)
- {
- int id = _connection.getNextChannelID();
- assertEquals("Unexpected number on iteration "+j, i, id);
- _connection.deregisterSession(id);
- }
- }
- }
+
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org