You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/21 11:24:29 UTC
svn commit: r1765997 - in /qpid/java/branches/transfer-queue: ./
broker-core/src/main/java/org/apache/qpid/server/stats/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/...
Author: rgodfrey
Date: Fri Oct 21 11:24:29 2016
New Revision: 1765997
URL: http://svn.apache.org/viewvc?rev=1765997&view=rev
Log:
merge up to r1765852 from trunk
Modified:
qpid/java/branches/transfer-queue/ (props changed)
qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
Propchange: qpid/java/branches/transfer-queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 11:24:29 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1763546-1765634
+/qpid/java/trunk:1763546-1765852
/qpid/trunk/qpid:796646-796653
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java?rev=1765997&r1=1765996&r2=1765997&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java Fri Oct 21 11:24:29 2016
@@ -61,52 +61,52 @@ public class StatisticsCounter
private static final class Sample
{
private final long _sampleId;
- private final AtomicLong _count = new AtomicLong();
- private final AtomicLong _total;
- private final long _peak;
- private final long _lastRate;
+ private final AtomicLong _sampleTotal = new AtomicLong();
+ private final AtomicLong _cumulativeTotal;
+ private final long _peakTotal;
+ private final long _previousSampleTotal;
private final long _start;
private final long _period;
private Sample(final long period)
{
_period = period;
- _total = new AtomicLong();
- _peak = 0L;
- _lastRate = 0L;
+ _cumulativeTotal = new AtomicLong();
+ _peakTotal = 0L;
+ _previousSampleTotal = 0L;
_start = System.currentTimeMillis();
- _sampleId = _start / period;
+ _sampleId = 0;
}
private Sample(final long timestamp, Sample priorSample)
{
_period = priorSample._period;
- _sampleId = timestamp / _period;
- _total = priorSample._total;
- _peak = priorSample.getRate() > priorSample.getPeak() ? priorSample.getRate() : priorSample.getPeak();
- _lastRate = priorSample.getRate();
+ _cumulativeTotal = priorSample._cumulativeTotal;
+ _peakTotal = priorSample.getSampleTotal() > priorSample.getPeakSampleTotal() ? priorSample.getSampleTotal() : priorSample.getPeakSampleTotal();
+ _previousSampleTotal = priorSample.getSampleTotal();
_start = priorSample._start;
+ _sampleId = (timestamp - _start) / _period;
}
- public long getTotal()
+ public long getCumulativeTotal()
{
- return _total.get();
+ return _cumulativeTotal.get();
}
- public long getRate()
+ public long getSampleTotal()
{
- return _count.get();
+ return _sampleTotal.get();
}
- public long getPeak()
+ public long getPeakSampleTotal()
{
- return _peak;
+ return _peakTotal;
}
- public long getLastRate()
+ public long getPreviousSampleTotal()
{
- return _lastRate;
+ return _previousSampleTotal;
}
public long getStart()
@@ -118,15 +118,15 @@ public class StatisticsCounter
{
if(timestamp >= _start)
{
- long eventSampleId = timestamp / _period;
+ long eventSampleId = (timestamp - _start) / _period;
if(eventSampleId > _sampleId)
{
return false;
}
- _total.addAndGet(value);
+ _cumulativeTotal.addAndGet(value);
if(eventSampleId == _sampleId)
{
- _count.addAndGet(value);
+ _sampleTotal.addAndGet(value);
}
return true;
}
@@ -179,7 +179,7 @@ public class StatisticsCounter
public double getPeak()
{
update();
- return (double) getSample().getPeak() / ((double) _period / 1000.0d);
+ return (double) getSample().getPeakSampleTotal() / ((double) _period / 1000.0d);
}
private Sample getSample()
@@ -190,12 +190,12 @@ public class StatisticsCounter
public double getRate()
{
update();
- return (double) getSample().getLastRate() / ((double) _period / 1000.0d);
+ return (double) getSample().getPreviousSampleTotal() / ((double) _period / 1000.0d);
}
public long getTotal()
{
- return getSample().getTotal();
+ return getSample().getCumulativeTotal();
}
public long getStart()
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1765997&r1=1765996&r2=1765997&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Fri Oct 21 11:24:29 2016
@@ -22,16 +22,20 @@ package org.apache.qpid.server.transport
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.transport.TransportException;
public class NetworkConnectionScheduler
@@ -204,7 +208,31 @@ public class NetworkConnectionScheduler
public void cancelAcceptingSocket(final ServerSocketChannel serverSocket)
{
- _selectorThread.cancelAcceptingSocket(serverSocket);
+ Future<Void> result = cancelAcceptingSocketAsync(serverSocket);
+ try
+ {
+ result.get(Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT),
+ TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket was interrupted");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket failed", e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket timed out");
+ }
+ }
+
+ private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel serverSocket)
+ {
+ return _selectorThread.cancelAcceptingSocket(serverSocket);
}
public void addConnection(final SchedulableConnection connection)
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1765997&r1=1765996&r2=1765997&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Fri Oct 21 11:24:29 2016
@@ -36,10 +36,12 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -431,8 +433,9 @@ class SelectorThread extends Thread
_selectionTasks[0].wakeup();
}
- public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+ public Future<Void> cancelAcceptingSocket(final ServerSocketChannel socketChannel)
{
+ final SettableFuture<Void> cancellationResult = SettableFuture.create();
_tasks.add(new Runnable()
{
@Override
@@ -443,14 +446,33 @@ class SelectorThread extends Thread
LOGGER.debug("Cancelling selector on accepting port {} ",
socketChannel.socket().getLocalSocketAddress());
}
- SelectionKey selectionKey = socketChannel.keyFor(_selectionTasks[0].getSelector());
- if (selectionKey != null)
+
+ try
+ {
+ SelectionKey selectionKey = null;
+ try
+ {
+ selectionKey = socketChannel.register(_selectionTasks[0].getSelector(), 0);
+ }
+ catch (ClosedChannelException e)
+ {
+ LOGGER.error("Failed to deregister selector on accepting port {}",
+ socketChannel.socket().getLocalSocketAddress(), e);
+ }
+
+ if (selectionKey != null)
+ {
+ selectionKey.cancel();
+ }
+ }
+ finally
{
- selectionKey.cancel();
+ cancellationResult.set(null);
}
}
});
_selectionTasks[0].wakeup();
+ return cancellationResult;
}
@Override
Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1765997&r1=1765996&r2=1765997&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Fri Oct 21 11:24:29 2016
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -31,10 +35,6 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Struct;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
public class MessageMetaData_0_10 implements StorableMessageMetaData
{
private Header _header;
@@ -83,7 +83,7 @@ public class MessageMetaData_0_10 implem
return TYPE;
}
- public int getStorableSize()
+ public synchronized int getStorableSize()
{
QpidByteBuffer buf = _encoded;
@@ -93,13 +93,12 @@ public class MessageMetaData_0_10 implem
_encoded = buf;
}
- //TODO -- need to add stuff
return buf.limit();
}
private QpidByteBuffer encodeAsBuffer()
{
- ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE);
+ ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE, false);
encoder.writeInt64(_arrivalTime);
encoder.writeInt32(_bodySize);
@@ -141,28 +140,18 @@ public class MessageMetaData_0_10 implem
return buf;
}
- public int writeToBuffer(QpidByteBuffer dest)
+ public synchronized int writeToBuffer(QpidByteBuffer dest)
{
- QpidByteBuffer buf = _encoded;
-
- if(buf == null)
+ if (_encoded == null)
{
- buf = encodeAsBuffer();
- _encoded = buf;
+ _encoded = encodeAsBuffer();
}
-
- buf = buf.duplicate();
-
- buf.position(0);
-
- if(dest.remaining() < buf.limit())
- {
- buf.limit(dest.remaining());
- }
- dest.put(buf);
- final int length = buf.limit();
- buf.dispose();
- return length;
+ dest.put(_encoded);
+ final int bytesWritten = _encoded.limit();
+ // We have special knowledge that we no longer need the encoded form after this call
+ // to reduce memory usage associated with the metadata free the encoded form here (QPID-7465)
+ clearEncodedForm();
+ return bytesWritten;
}
public int getContentSize()
@@ -178,13 +167,17 @@ public class MessageMetaData_0_10 implem
@Override
public void dispose()
{
-
+ clearEncodedForm();
}
@Override
- public void clearEncodedForm()
+ public synchronized void clearEncodedForm()
{
-
+ if (_encoded != null)
+ {
+ _encoded.dispose();
+ _encoded = null;
+ }
}
public String getRoutingKey()
Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java?rev=1765997&r1=1765996&r2=1765997&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java Fri Oct 21 11:24:29 2016
@@ -30,21 +30,23 @@ import org.apache.qpid.transport.codec.A
public final class ServerEncoder extends AbstractEncoder
{
- public static final int DEFAULT_CAPACITY = 8192;
+ public static final int DEFAULT_CAPACITY = 256 * 1024;
+ private final boolean _useDirectMemory;
private final int _threshold;
private QpidByteBuffer _out;
private int _initialCapacity;
public ServerEncoder()
{
- this(DEFAULT_CAPACITY);
+ this(QpidByteBuffer.getPooledBufferSize(), true);
}
- public ServerEncoder(int capacity)
+ public ServerEncoder(int capacity, boolean useDirectMemory)
{
- _initialCapacity = capacity;
- _threshold = capacity/16;
- _out = QpidByteBuffer.allocateDirect(capacity);
+ _useDirectMemory = useDirectMemory;
+ _initialCapacity = (capacity > 0 ? capacity : DEFAULT_CAPACITY);
+ _threshold = Math.min(_initialCapacity/16, 256);
+ _out = QpidByteBuffer.allocate(useDirectMemory, _initialCapacity);
}
public void init()
@@ -52,7 +54,7 @@ public final class ServerEncoder extends
if(_out.capacity() < _threshold)
{
_out.dispose();
- _out = QpidByteBuffer.allocateDirect(_initialCapacity);
+ _out = QpidByteBuffer.allocate(_useDirectMemory, _initialCapacity);
}
else
{
@@ -81,7 +83,7 @@ public final class ServerEncoder extends
{
QpidByteBuffer old = _out;
int capacity = old.capacity();
- _out = QpidByteBuffer.allocateDirect(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
+ _out = QpidByteBuffer.allocate(_useDirectMemory, Math.max(Math.max(capacity + size, 2 * capacity), _initialCapacity));
old.flip();
_out.put(old);
old.dispose();
Modified: qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1765997&r1=1765996&r2=1765997&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Fri Oct 21 11:24:29 2016
@@ -804,6 +804,11 @@ public class QpidByteBuffer
_isPoolInitialized = true;
}
+ public static int getPooledBufferSize()
+ {
+ return _pooledBufferSize;
+ }
+
private static final class BufferInputStream extends InputStream
{
private final QpidByteBuffer _qpidByteBuffer;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org