You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/08/17 17:31:57 UTC
svn commit: r1696307 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/security/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-core/src/main/java/org/apa...
Author: kwall
Date: Mon Aug 17 15:31:57 2015
New Revision: 1696307
URL: http://svn.apache.org/r1696307
Log:
QPID-6662 : Introduce simple pooling mechanism where direct byte buffers are reused when their wrapper objects are finalised
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
- copied, changed from r1695939, qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/RuntimeDefault.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Mon Aug 17 15:31:57 2015
@@ -21,6 +21,8 @@
package org.apache.qpid.server.model;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
@@ -127,6 +129,10 @@ public interface Broker<X extends Broker
@ManagedContextDefault(name = MESSAGE_COMPRESSION_THRESHOLD_SIZE)
int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
+ String BROKER_DIRECT_BYTE_BUFFER_POOL_SIZES = "broker.directByteBufferPoolSizes";
+ @ManagedContextDefault(name = BROKER_DIRECT_BYTE_BUFFER_POOL_SIZES)
+ Map<String, Integer> DEFAULT_BROKER_DIRECT_BYTE_BUFFER_POOL_SIZES = Collections.singletonMap("", 1024);
+
@ManagedAttribute(validValues = {"org.apache.qpid.server.model.adapter.BrokerAdapter#getAvailableConfigurationEncrypters()"})
String getConfidentialConfigurationEncryptionProvider();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java Mon Aug 17 15:31:57 2015
@@ -40,6 +40,8 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -785,7 +787,24 @@ public class ConfiguredObjectTypeRegistr
Object value = field.get(null);
if(!_defaultContext.containsKey(name))
{
- _defaultContext.put(name,String.valueOf(value));
+ final String stringValue;
+ if (value instanceof Collection || value instanceof Map)
+ {
+ try
+ {
+ stringValue = new ObjectMapper().writeValueAsString(value);
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new ServerScopedRuntimeException("Unable to convert value of type '" + value.getClass()
+ + "' to a JSON string for conext variable ${"+name+"}");
+ }
+ }
+ else
+ {
+ stringValue = String.valueOf(value);
+ }
+ _defaultContext.put(name, stringValue);
}
else
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java Mon Aug 17 15:31:57 2015
@@ -27,7 +27,6 @@ import org.apache.qpid.server.model.KeyS
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.RuntimeDefault;
@ManagedObject( category = false, type = "FileKeyStore" )
public interface FileKeyStore<X extends FileKeyStore<X>> extends KeyStore<X>
@@ -39,25 +38,10 @@ public interface FileKeyStore<X extends
String STORE_URL = "storeUrl";
@ManagedContextDefault(name = "keyStoreFile.keyStoreType")
- RuntimeDefault<String> DEFAULT_KEYSTORE_TYPE =
- new RuntimeDefault<String>()
- {
- @Override
- public String value()
- {
- return java.security.KeyStore.getDefaultType();
- }
- };
+ String DEFAULT_KEYSTORE_TYPE = java.security.KeyStore.getDefaultType();
+
@ManagedContextDefault(name = "keyStoreFile.keyManagerFactoryAlgorithm")
- RuntimeDefault<String> DEFAULT_KEY_MANAGER_FACTORY_ALGORITHM =
- new RuntimeDefault<String>()
- {
- @Override
- public String value()
- {
- return KeyManagerFactory.getDefaultAlgorithm();
- }
- };
+ String DEFAULT_KEY_MANAGER_FACTORY_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
@ManagedAttribute(defaultValue = "${this:path}")
String getDescription();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java Mon Aug 17 15:31:57 2015
@@ -26,7 +26,6 @@ import org.apache.qpid.server.model.Deri
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.RuntimeDefault;
import org.apache.qpid.server.model.TrustStore;
@ManagedObject( category = false, type = "FileTrustStore" )
@@ -38,26 +37,10 @@ public interface FileTrustStore<X extend
String PASSWORD = "password";
String STORE_URL = "storeUrl";
@ManagedContextDefault(name = "trustStoreFile.trustStoreType")
- RuntimeDefault<String> DEFAULT_TRUSTSTORE_TYPE =
- new RuntimeDefault<String>()
- {
- @Override
- public String value()
- {
- return java.security.KeyStore.getDefaultType();
- }
- };
- @ManagedContextDefault(name = "trustStoreFile.trustManagerFactoryAlgorithm")
- RuntimeDefault<String> DEFAULT_TRUST_MANAGER_FACTORY_ALGORITHM =
- new RuntimeDefault<String>()
- {
- @Override
- public String value()
- {
- return KeyManagerFactory.getDefaultAlgorithm();
- }
- };
+ String DEFAULT_TRUSTSTORE_TYPE = java.security.KeyStore.getDefaultType();
+ @ManagedContextDefault(name = "trustStoreFile.trustManagerFactoryAlgorithm")
+ String DEFAULT_TRUST_MANAGER_FACTORY_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
@ManagedAttribute(defaultValue = "${this:storeUrl}")
String getDescription();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Mon Aug 17 15:31:57 2015
@@ -64,6 +64,7 @@ public class NonBlockingConnection imple
private volatile boolean _fullyWritten = true;
private boolean _partialRead = false;
+
private final AmqpPort _port;
public NonBlockingConnection(SocketChannel socketChannel,
@@ -125,6 +126,11 @@ public class NonBlockingConnection imple
return _socketChannel;
}
+ public AmqpPort getPort()
+ {
+ return _port;
+ }
+
@Override
public void start()
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java Mon Aug 17 15:31:57 2015
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.util.QpidByteBufferUtils;
public class NonBlockingConnectionPlainDelegate implements NonBlockingConnectionDelegate
{
@@ -40,7 +41,9 @@ public class NonBlockingConnectionPlainD
public NonBlockingConnectionPlainDelegate(NonBlockingConnection parent)
{
_parent = parent;
- _netInputBuffer = QpidByteBuffer.allocateDirect(parent.getReceiveBufferSize());
+ final int receiveBufferSize = parent.getReceiveBufferSize();
+ QpidByteBufferUtils.createPool(parent.getPort(), receiveBufferSize);
+ _netInputBuffer = QpidByteBuffer.allocateDirect(receiveBufferSize);
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java Mon Aug 17 15:31:57 2015
@@ -20,7 +20,9 @@
package org.apache.qpid.server.transport;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.util.QpidByteBufferUtils;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
+import java.util.Map;
public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDelegate
{
@@ -58,13 +61,24 @@ public class NonBlockingConnectionTLSDel
{
_parent = parent;
_sslEngine = createSSLEngine(port);
- _netInputBuffer = QpidByteBuffer.allocateDirect(Math.max(parent.getReceiveBufferSize(),
- _sslEngine.getSession().getPacketBufferSize()));
+
+
+ final int bufSize = Math.max(parent.getReceiveBufferSize(),
+ _sslEngine.getSession().getPacketBufferSize());
+ QpidByteBufferUtils.createPool(port, bufSize);
+ _netInputBuffer = QpidByteBuffer.allocateDirect(bufSize);
_initialApplicationBufferSize =
Math.max(_sslEngine.getSession().getApplicationBufferSize() + 50, _parent.getReceiveBufferSize());
+
+
+
+ QpidByteBufferUtils.createPool(port, _initialApplicationBufferSize);
+
_applicationBuffer = QpidByteBuffer.allocateDirect(_initialApplicationBufferSize);
+ QpidByteBufferUtils.createPool(port, _sslEngine.getSession().getPacketBufferSize());
+
}
@Override
@@ -172,7 +186,7 @@ public class NonBlockingConnectionTLSDel
if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
{
final QpidByteBuffer netBuffer = QpidByteBuffer.allocateDirect(_sslEngine.getSession().getPacketBufferSize());
- _status = QpidByteBuffer.encryptSSL(_sslEngine,bufferArray, netBuffer);
+ _status = QpidByteBuffer.encryptSSL(_sslEngine, bufferArray, netBuffer);
runSSLEngineTasks(_status);
netBuffer.flip();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java Mon Aug 17 15:31:57 2015
@@ -31,7 +31,6 @@ import java.util.Collection;
public class NonBlockingConnectionUndecidedDelegate implements NonBlockingConnectionDelegate
{
private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
- private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocate(0);
public final NonBlockingConnection _parent;
private QpidByteBuffer _netInputBuffer;
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java?rev=1696307&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java Mon Aug 17 15:31:57 2015
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.util;
+
+import java.util.Map;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+
+public class QpidByteBufferUtils
+{
+ public static final int DEFAULT_MAX_POOLSIZE = 1024;
+
+ public static void createPool(ConfiguredObject object, int bufferSize)
+ {
+ Map<String, Integer> bufferPoolSizes =
+ (Map<String, Integer>) object.getContextValue(Map.class, Broker.BROKER_DIRECT_BYTE_BUFFER_POOL_SIZES);
+
+ int fallbackMaxPoolSize = bufferPoolSizes != null && bufferPoolSizes.containsKey("") ? bufferPoolSizes.get("") : DEFAULT_MAX_POOLSIZE;
+ final String bufferSizeKey = String.valueOf(bufferSize);
+ int maxPoolSize = bufferPoolSizes != null && bufferPoolSizes.containsKey(bufferSizeKey) ? bufferPoolSizes.get(bufferSizeKey) : fallbackMaxPoolSize;
+
+ QpidByteBuffer.createPool(bufferSize, maxPoolSize);
+
+ }
+
+}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java Mon Aug 17 15:31:57 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import java.nio.ByteBuffer;
import java.util.List;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
@@ -55,11 +54,11 @@ public final class ServerDecoder extends
}
else
{
- return readAsNativeByteBuffer(size);
+ return readAsQpidByteBuffer(size);
}
}
- private QpidByteBuffer readAsNativeByteBuffer(int len)
+ private QpidByteBuffer readAsQpidByteBuffer(int len)
{
QpidByteBuffer currentBuffer = getCurrentBuffer();
if(currentBuffer.remaining()>=len)
@@ -71,7 +70,7 @@ public final class ServerDecoder extends
}
else
{
- QpidByteBuffer dest = currentBuffer.isDirect() ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len);
+ QpidByteBuffer dest = QpidByteBuffer.allocate(len);
while(dest.hasRemaining() && available()>0)
{
advanceIfNecessary();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java Mon Aug 17 15:31:57 2015
@@ -37,7 +37,7 @@ import org.apache.qpid.transport.Segment
public class ServerInputHandler implements FrameSizeObserver
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerInputHandler.class);
- private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocate(0);
+ private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocateDirect(0);
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
Copied: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java (from r1695939, qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java?p2=qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java&r1=1695939&r2=1696307&rev=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java Mon Aug 17 15:31:57 2015
@@ -21,53 +21,43 @@
package org.apache.qpid.bytebuffer;
import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-class PooledByteBufferRef implements ByteBufferRef
+class BufferPool
{
- private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
+ private static final AtomicIntegerFieldUpdater<BufferPool> MAX_SIZE = AtomicIntegerFieldUpdater.newUpdater(BufferPool.class, "_maxSize");
+ @SuppressWarnings("unused")
+ private volatile int _maxSize;
+ private final ConcurrentLinkedQueue<ByteBuffer> _pooledBuffers = new ConcurrentLinkedQueue<>();
- private final ByteBuffer _buffer;
- private volatile int _refCount;
-
- PooledByteBufferRef(final ByteBuffer buffer)
+ BufferPool(final int maxSize)
{
- _buffer = buffer;
+ _maxSize = maxSize;
}
- @Override
- public void incrementRef()
+ ByteBuffer getBuffer()
{
-
- if(REF_COUNT.get(this) >= 0)
- {
- REF_COUNT.incrementAndGet(this);
- }
+ return _pooledBuffers.poll();
}
- @Override
- public void decrementRef()
+ void returnBuffer(ByteBuffer buf)
{
- if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
+ buf.clear();
+ if(_pooledBuffers.size() < MAX_SIZE.get(this))
{
- returnToPool(this);
+ _pooledBuffers.add(buf);
}
}
- @Override
- public ByteBuffer getBuffer()
+ void ensureSize(final int maxPoolSize)
{
- return _buffer.duplicate();
+ int currentSize;
+ while((currentSize = MAX_SIZE.get(this))<maxPoolSize && !MAX_SIZE.compareAndSet(this, currentSize, maxPoolSize));
}
- @Override
- public void removeFromPool()
+ public int getSize()
{
- REF_COUNT.set(this, Integer.MIN_VALUE/2);
+ return MAX_SIZE.get(this);
}
-
- private void returnToPool(final PooledByteBufferRef byteBufferRef)
- {
- }
-
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java Mon Aug 17 15:31:57 2015
@@ -50,7 +50,7 @@ class PooledByteBufferRef implements Byt
{
if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
{
- returnToPool(this);
+ QpidByteBuffer.returnToPool(_buffer);
}
}
@@ -66,8 +66,5 @@ class PooledByteBufferRef implements Byt
REF_COUNT.set(this, Integer.MIN_VALUE/2);
}
- private void returnToPool(final PooledByteBufferRef byteBufferRef)
- {
- }
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Mon Aug 17 15:31:57 2015
@@ -30,23 +30,34 @@ import java.nio.channels.ReadableByteCha
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.framing.AMQShortString;
public final class QpidByteBuffer
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(QpidByteBuffer.class);
+
private static final AtomicIntegerFieldUpdater<QpidByteBuffer> DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(QpidByteBuffer.class, "_disposed");
private final ByteBuffer _buffer;
private final ByteBufferRef _ref;
+ @SuppressWarnings("unused")
private volatile int _disposed;
+ private static final ConcurrentMap<Integer, BufferPool> _pools = new ConcurrentHashMap<>();
+
QpidByteBuffer(ByteBufferRef ref)
{
this(ref.getBuffer(), ref);
@@ -421,7 +432,22 @@ public final class QpidByteBuffer
public static QpidByteBuffer allocateDirect(int size)
{
- return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size)));
+ final ByteBufferRef ref;
+ BufferPool pool = _pools.get(size);
+ if(pool != null)
+ {
+ ByteBuffer buf = pool.getBuffer();
+ if(buf == null)
+ {
+ buf = ByteBuffer.allocateDirect(size);
+ }
+ ref = new PooledByteBufferRef(buf);
+ }
+ else
+ {
+ ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
+ }
+ return new QpidByteBuffer(ref);
}
@@ -488,6 +514,31 @@ public final class QpidByteBuffer
return wrap(ByteBuffer.wrap(data, offset, length));
}
+ static void returnToPool(final ByteBuffer buffer)
+ {
+ final BufferPool pool = _pools.get(buffer.capacity());
+ pool.returnBuffer(buffer);
+ }
+
+ public static void createPool(int bufferSize, int maxPoolSize)
+ {
+ final BufferPool pool = _pools.putIfAbsent(bufferSize, new BufferPool(maxPoolSize));
+ if(pool != null)
+ {
+ int currentPoolSize = pool.getSize();
+ if (maxPoolSize != currentPoolSize)
+ {
+ LOGGER.debug("Resizing direct pool, bufferSize : {} maxPoolSize from : {} to : ",
+ new Object[] {bufferSize, currentPoolSize, maxPoolSize});
+ }
+ pool.ensureSize(maxPoolSize);
+ }
+ else
+ {
+ LOGGER.debug("Created direct pool, bufferSize : {} maxPoolSize : {}", bufferSize, maxPoolSize);
+ }
+ }
+
private final class BufferInputStream extends InputStream
{
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Mon Aug 17 15:31:57 2015
@@ -30,12 +30,12 @@ import org.apache.qpid.util.BytesDataOut
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
+ private static final int HEADER_SIZE = 7;
private final int _channel;
private final AMQBody _bodyFrame;
public static final byte FRAME_END_BYTE = (byte) 0xCE;
-
public AMQFrame(final int channel, final AMQBody bodyFrame)
{
_channel = channel;
@@ -62,7 +62,6 @@ public class AMQFrame extends AMQDataBlo
buffer.writeByte(FRAME_END_BYTE);
}
- private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE };
private static final QpidByteBuffer FRAME_END_BYTE_BUFFER = QpidByteBuffer.allocateDirect(1);
static
{
@@ -73,7 +72,7 @@ public class AMQFrame extends AMQDataBlo
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
- QpidByteBuffer frameHeader = QpidByteBuffer.allocate(7);
+ QpidByteBuffer frameHeader = QpidByteBuffer.allocate(HEADER_SIZE);
frameHeader.put(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(frameHeader, _channel);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Mon Aug 17 15:31:57 2015
@@ -95,10 +95,7 @@ public class BasicContentHeaderPropertie
{
if(other._headers != null)
{
- byte[] encodedHeaders = other._headers.getDataAsBytes();
-
- _headers = new FieldTable(encodedHeaders,0,encodedHeaders.length);
-
+ _headers = new FieldTable(other._headers);
}
_contentType = other._contentType;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Mon Aug 17 15:31:57 2015
@@ -38,6 +38,7 @@ public class ContentHeaderBody implement
{
public static final byte TYPE = 2;
public static final int CLASS_ID = 60;
+ private static final int HEADER_SIZE = 14;
private long _bodySize;
@@ -106,14 +107,14 @@ public class ContentHeaderBody implement
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
- QpidByteBuffer data = QpidByteBuffer.allocate(14);
+ QpidByteBuffer data = QpidByteBuffer.allocate(HEADER_SIZE);
EncodingUtils.writeUnsignedShort(data, CLASS_ID);
EncodingUtils.writeUnsignedShort(data, 0);
data.putLong(_bodySize);
EncodingUtils.writeUnsignedShort(data, _properties.getPropertyFlags());
data.flip();
sender.send(data);
- return 14 + _properties.writePropertyListPayload(sender);
+ return HEADER_SIZE + _properties.writePropertyListPayload(sender);
}
public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Mon Aug 17 15:31:57 2015
@@ -70,11 +70,18 @@ public class FieldTable
_strictAMQP = strictAMQP;
}
- public FieldTable(byte[] encodedForm, int offset, int length)
+ public FieldTable(FieldTable other)
{
- this(QpidByteBuffer.wrap(encodedForm,offset,length));
+ _encodedForm = other._encodedForm;
+ _encodedSize = other._encodedSize;
+ _strictAMQP = other._strictAMQP;
+ if(other._properties != null)
+ {
+ _properties = new LinkedHashMap<>(other._properties);
+ }
}
+
public FieldTable(QpidByteBuffer buffer)
{
this();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org