You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/08/17 17:31:57 UTC

svn commit: r1696307 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/security/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-core/src/main/java/org/apa...

Author: kwall
Date: Mon Aug 17 15:31:57 2015
New Revision: 1696307

URL: http://svn.apache.org/r1696307
Log:
QPID-6662 : Introduce simple pooling mechanism where direct byte buffers are reused when their wrapper objects are finalised

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
      - copied, changed from r1695939, qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/RuntimeDefault.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Mon Aug 17 15:31:57 2015
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.model;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
 
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
@@ -127,6 +129,10 @@ public interface Broker<X extends Broker
     @ManagedContextDefault(name = MESSAGE_COMPRESSION_THRESHOLD_SIZE)
     int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
 
+    String BROKER_DIRECT_BYTE_BUFFER_POOL_SIZES = "broker.directByteBufferPoolSizes";
+    @ManagedContextDefault(name = BROKER_DIRECT_BYTE_BUFFER_POOL_SIZES)
+    Map<String, Integer> DEFAULT_BROKER_DIRECT_BYTE_BUFFER_POOL_SIZES = Collections.singletonMap("", 1024);
+
     @ManagedAttribute(validValues = {"org.apache.qpid.server.model.adapter.BrokerAdapter#getAvailableConfigurationEncrypters()"})
     String getConfidentialConfigurationEncryptionProvider();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java Mon Aug 17 15:31:57 2015
@@ -40,6 +40,8 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -785,7 +787,24 @@ public class ConfiguredObjectTypeRegistr
                     Object value = field.get(null);
                     if(!_defaultContext.containsKey(name))
                     {
-                        _defaultContext.put(name,String.valueOf(value));
+                        final String stringValue;
+                        if (value instanceof Collection || value instanceof Map)
+                        {
+                            try
+                            {
+                                stringValue = new ObjectMapper().writeValueAsString(value);
+                            }
+                            catch (JsonProcessingException e)
+                            {
+                                throw new ServerScopedRuntimeException("Unable to convert value of type '" + value.getClass()
+                                                                       + "' to a JSON string for conext variable ${"+name+"}");
+                            }
+                        }
+                        else
+                        {
+                            stringValue = String.valueOf(value);
+                        }
+                        _defaultContext.put(name, stringValue);
                     }
                     else
                     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java Mon Aug 17 15:31:57 2015
@@ -27,7 +27,6 @@ import org.apache.qpid.server.model.KeyS
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedContextDefault;
 import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.RuntimeDefault;
 
 @ManagedObject( category = false, type = "FileKeyStore" )
 public interface FileKeyStore<X extends FileKeyStore<X>> extends KeyStore<X>
@@ -39,25 +38,10 @@ public interface FileKeyStore<X extends
     String STORE_URL = "storeUrl";
 
     @ManagedContextDefault(name = "keyStoreFile.keyStoreType")
-    RuntimeDefault<String> DEFAULT_KEYSTORE_TYPE =
-            new RuntimeDefault<String>()
-            {
-                @Override
-                public String value()
-                {
-                    return java.security.KeyStore.getDefaultType();
-                }
-            };
+    String DEFAULT_KEYSTORE_TYPE = java.security.KeyStore.getDefaultType();
+
     @ManagedContextDefault(name = "keyStoreFile.keyManagerFactoryAlgorithm")
-    RuntimeDefault<String> DEFAULT_KEY_MANAGER_FACTORY_ALGORITHM =
-            new RuntimeDefault<String>()
-            {
-                @Override
-                public String value()
-                {
-                    return KeyManagerFactory.getDefaultAlgorithm();
-                }
-            };
+    String DEFAULT_KEY_MANAGER_FACTORY_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
 
     @ManagedAttribute(defaultValue = "${this:path}")
     String getDescription();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStore.java Mon Aug 17 15:31:57 2015
@@ -26,7 +26,6 @@ import org.apache.qpid.server.model.Deri
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedContextDefault;
 import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.RuntimeDefault;
 import org.apache.qpid.server.model.TrustStore;
 
 @ManagedObject( category = false, type = "FileTrustStore" )
@@ -38,26 +37,10 @@ public interface FileTrustStore<X extend
     String PASSWORD = "password";
     String STORE_URL = "storeUrl";
     @ManagedContextDefault(name = "trustStoreFile.trustStoreType")
-    RuntimeDefault<String> DEFAULT_TRUSTSTORE_TYPE =
-            new RuntimeDefault<String>()
-            {
-                @Override
-                public String value()
-                {
-                    return java.security.KeyStore.getDefaultType();
-                }
-            };
-    @ManagedContextDefault(name = "trustStoreFile.trustManagerFactoryAlgorithm")
-    RuntimeDefault<String> DEFAULT_TRUST_MANAGER_FACTORY_ALGORITHM =
-            new RuntimeDefault<String>()
-            {
-                @Override
-                public String value()
-                {
-                    return KeyManagerFactory.getDefaultAlgorithm();
-                }
-            };
+    String DEFAULT_TRUSTSTORE_TYPE = java.security.KeyStore.getDefaultType();
 
+    @ManagedContextDefault(name = "trustStoreFile.trustManagerFactoryAlgorithm")
+    String DEFAULT_TRUST_MANAGER_FACTORY_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
 
     @ManagedAttribute(defaultValue = "${this:storeUrl}")
     String getDescription();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Mon Aug 17 15:31:57 2015
@@ -64,6 +64,7 @@ public class NonBlockingConnection imple
     private volatile boolean _fullyWritten = true;
 
     private boolean _partialRead = false;
+
     private final AmqpPort _port;
 
     public NonBlockingConnection(SocketChannel socketChannel,
@@ -125,6 +126,11 @@ public class NonBlockingConnection imple
         return _socketChannel;
     }
 
+    public AmqpPort getPort()
+    {
+        return _port;
+    }
+
     @Override
     public void start()
     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java Mon Aug 17 15:31:57 2015
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.util.QpidByteBufferUtils;
 
 public class NonBlockingConnectionPlainDelegate implements NonBlockingConnectionDelegate
 {
@@ -40,7 +41,9 @@ public class NonBlockingConnectionPlainD
     public NonBlockingConnectionPlainDelegate(NonBlockingConnection parent)
     {
         _parent = parent;
-        _netInputBuffer = QpidByteBuffer.allocateDirect(parent.getReceiveBufferSize());
+        final int receiveBufferSize = parent.getReceiveBufferSize();
+        QpidByteBufferUtils.createPool(parent.getPort(), receiveBufferSize);
+        _netInputBuffer = QpidByteBuffer.allocateDirect(receiveBufferSize);
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java Mon Aug 17 15:31:57 2015
@@ -20,7 +20,9 @@
 package org.apache.qpid.server.transport;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.util.QpidByteBufferUtils;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
 
 public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDelegate
 {
@@ -58,13 +61,24 @@ public class NonBlockingConnectionTLSDel
     {
         _parent = parent;
         _sslEngine = createSSLEngine(port);
-        _netInputBuffer = QpidByteBuffer.allocateDirect(Math.max(parent.getReceiveBufferSize(),
-                                                                 _sslEngine.getSession().getPacketBufferSize()));
+
+
+        final int bufSize = Math.max(parent.getReceiveBufferSize(),
+                                 _sslEngine.getSession().getPacketBufferSize());
+        QpidByteBufferUtils.createPool(port, bufSize);
+        _netInputBuffer = QpidByteBuffer.allocateDirect(bufSize);
 
         _initialApplicationBufferSize =
                 Math.max(_sslEngine.getSession().getApplicationBufferSize() + 50, _parent.getReceiveBufferSize());
+
+
+
+        QpidByteBufferUtils.createPool(port, _initialApplicationBufferSize);
+
         _applicationBuffer = QpidByteBuffer.allocateDirect(_initialApplicationBufferSize);
 
+        QpidByteBufferUtils.createPool(port, _sslEngine.getSession().getPacketBufferSize());
+
     }
 
     @Override
@@ -172,7 +186,7 @@ public class NonBlockingConnectionTLSDel
             if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
             {
                 final QpidByteBuffer netBuffer = QpidByteBuffer.allocateDirect(_sslEngine.getSession().getPacketBufferSize());
-                _status = QpidByteBuffer.encryptSSL(_sslEngine,bufferArray, netBuffer);
+                _status = QpidByteBuffer.encryptSSL(_sslEngine, bufferArray, netBuffer);
                 runSSLEngineTasks(_status);
 
                 netBuffer.flip();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java Mon Aug 17 15:31:57 2015
@@ -31,7 +31,6 @@ import java.util.Collection;
 public class NonBlockingConnectionUndecidedDelegate implements NonBlockingConnectionDelegate
 {
     private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
-    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocate(0);
     public final NonBlockingConnection _parent;
 
     private QpidByteBuffer _netInputBuffer;

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java?rev=1696307&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/QpidByteBufferUtils.java Mon Aug 17 15:31:57 2015
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.util;
+
+import java.util.Map;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+
+public class QpidByteBufferUtils
+{
+    public static final int DEFAULT_MAX_POOLSIZE = 1024;
+
+    public static void createPool(ConfiguredObject object, int bufferSize)
+    {
+        Map<String, Integer> bufferPoolSizes =
+                (Map<String, Integer>) object.getContextValue(Map.class, Broker.BROKER_DIRECT_BYTE_BUFFER_POOL_SIZES);
+
+        int fallbackMaxPoolSize = bufferPoolSizes != null && bufferPoolSizes.containsKey("") ? bufferPoolSizes.get("") : DEFAULT_MAX_POOLSIZE;
+        final String bufferSizeKey = String.valueOf(bufferSize);
+        int maxPoolSize = bufferPoolSizes != null && bufferPoolSizes.containsKey(bufferSizeKey) ? bufferPoolSizes.get(bufferSizeKey) : fallbackMaxPoolSize;
+
+        QpidByteBuffer.createPool(bufferSize, maxPoolSize);
+
+    }
+
+}

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java Mon Aug 17 15:31:57 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
@@ -55,11 +54,11 @@ public final class ServerDecoder extends
         }
         else
         {
-            return readAsNativeByteBuffer(size);
+            return readAsQpidByteBuffer(size);
         }
     }
 
-    private QpidByteBuffer readAsNativeByteBuffer(int len)
+    private QpidByteBuffer readAsQpidByteBuffer(int len)
     {
         QpidByteBuffer currentBuffer = getCurrentBuffer();
         if(currentBuffer.remaining()>=len)
@@ -71,7 +70,7 @@ public final class ServerDecoder extends
         }
         else
         {
-            QpidByteBuffer dest = currentBuffer.isDirect() ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len);
+            QpidByteBuffer dest = QpidByteBuffer.allocate(len);
             while(dest.hasRemaining() && available()>0)
             {
                 advanceIfNecessary();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java Mon Aug 17 15:31:57 2015
@@ -37,7 +37,7 @@ import org.apache.qpid.transport.Segment
 public class ServerInputHandler implements FrameSizeObserver
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerInputHandler.class);
-    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocate(0);
+    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocateDirect(0);
 
     private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
 

Copied: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java (from r1695939, qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java?p2=qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java&r1=1695939&r2=1696307&rev=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java Mon Aug 17 15:31:57 2015
@@ -21,53 +21,43 @@
 package org.apache.qpid.bytebuffer;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-class PooledByteBufferRef implements ByteBufferRef
+class BufferPool
 {
-    private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
+    private static final AtomicIntegerFieldUpdater<BufferPool> MAX_SIZE = AtomicIntegerFieldUpdater.newUpdater(BufferPool.class, "_maxSize");
+    @SuppressWarnings("unused")
+    private volatile int _maxSize;
+    private final ConcurrentLinkedQueue<ByteBuffer> _pooledBuffers = new ConcurrentLinkedQueue<>();
 
-    private final ByteBuffer _buffer;
-    private volatile int _refCount;
-
-    PooledByteBufferRef(final ByteBuffer buffer)
+    BufferPool(final int maxSize)
     {
-        _buffer = buffer;
+        _maxSize = maxSize;
     }
 
-    @Override
-    public void incrementRef()
+    ByteBuffer getBuffer()
     {
-
-        if(REF_COUNT.get(this) >= 0)
-        {
-            REF_COUNT.incrementAndGet(this);
-        }
+        return _pooledBuffers.poll();
     }
 
-    @Override
-    public void decrementRef()
+    void returnBuffer(ByteBuffer buf)
     {
-        if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
+        buf.clear();
+        if(_pooledBuffers.size() < MAX_SIZE.get(this))
         {
-            returnToPool(this);
+            _pooledBuffers.add(buf);
         }
     }
 
-    @Override
-    public ByteBuffer getBuffer()
+    void ensureSize(final int maxPoolSize)
     {
-        return _buffer.duplicate();
+        int currentSize;
+        while((currentSize = MAX_SIZE.get(this))<maxPoolSize && !MAX_SIZE.compareAndSet(this, currentSize, maxPoolSize));
     }
 
-    @Override
-    public void removeFromPool()
+    public int getSize()
     {
-        REF_COUNT.set(this, Integer.MIN_VALUE/2);
+        return MAX_SIZE.get(this);
     }
-
-    private void returnToPool(final PooledByteBufferRef byteBufferRef)
-    {
-    }
-
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java Mon Aug 17 15:31:57 2015
@@ -50,7 +50,7 @@ class PooledByteBufferRef implements Byt
     {
         if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
         {
-            returnToPool(this);
+            QpidByteBuffer.returnToPool(_buffer);
         }
     }
 
@@ -66,8 +66,5 @@ class PooledByteBufferRef implements Byt
         REF_COUNT.set(this, Integer.MIN_VALUE/2);
     }
 
-    private void returnToPool(final PooledByteBufferRef byteBufferRef)
-    {
-    }
 
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Mon Aug 17 15:31:57 2015
@@ -30,23 +30,34 @@ import java.nio.channels.ReadableByteCha
 import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.framing.AMQShortString;
 
 public final class QpidByteBuffer
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(QpidByteBuffer.class);
+
     private static final AtomicIntegerFieldUpdater<QpidByteBuffer> DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(QpidByteBuffer.class, "_disposed");
 
     private final ByteBuffer _buffer;
     private final ByteBufferRef _ref;
+    @SuppressWarnings("unused")
     private volatile int _disposed;
 
+    private static final ConcurrentMap<Integer, BufferPool> _pools = new ConcurrentHashMap<>();
+
     QpidByteBuffer(ByteBufferRef ref)
     {
         this(ref.getBuffer(), ref);
@@ -421,7 +432,22 @@ public final class QpidByteBuffer
 
     public static QpidByteBuffer allocateDirect(int size)
     {
-        return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size)));
+        final ByteBufferRef ref;
+        BufferPool pool = _pools.get(size);
+        if(pool != null)
+        {
+            ByteBuffer buf = pool.getBuffer();
+            if(buf == null)
+            {
+                buf = ByteBuffer.allocateDirect(size);
+            }
+            ref = new PooledByteBufferRef(buf);
+        }
+        else
+        {
+            ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
+        }
+        return new QpidByteBuffer(ref);
     }
 
 
@@ -488,6 +514,31 @@ public final class QpidByteBuffer
         return wrap(ByteBuffer.wrap(data, offset, length));
     }
 
+    static void returnToPool(final ByteBuffer buffer)
+    {
+        final BufferPool pool = _pools.get(buffer.capacity());
+        pool.returnBuffer(buffer);
+    }
+
+    public static void createPool(int bufferSize, int maxPoolSize)
+    {
+        final BufferPool pool = _pools.putIfAbsent(bufferSize, new BufferPool(maxPoolSize));
+        if(pool != null)
+        {
+            int currentPoolSize = pool.getSize();
+            if (maxPoolSize != currentPoolSize)
+            {
+                LOGGER.debug("Resizing direct pool, bufferSize : {} maxPoolSize from : {} to : ",
+                    new Object[] {bufferSize, currentPoolSize, maxPoolSize});
+            }
+            pool.ensureSize(maxPoolSize);
+        }
+        else
+        {
+            LOGGER.debug("Created direct pool, bufferSize : {} maxPoolSize : {}", bufferSize, maxPoolSize);
+        }
+    }
+
     private final class BufferInputStream extends InputStream
     {
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Mon Aug 17 15:31:57 2015
@@ -30,12 +30,12 @@ import org.apache.qpid.util.BytesDataOut
 
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {
+    private static final int HEADER_SIZE = 7;
     private final int _channel;
 
     private final AMQBody _bodyFrame;
     public static final byte FRAME_END_BYTE = (byte) 0xCE;
 
-
     public AMQFrame(final int channel, final AMQBody bodyFrame)
     {
         _channel = channel;
@@ -62,7 +62,6 @@ public class AMQFrame extends AMQDataBlo
         buffer.writeByte(FRAME_END_BYTE);
     }
 
-    private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE };
     private static final QpidByteBuffer FRAME_END_BYTE_BUFFER = QpidByteBuffer.allocateDirect(1);
     static
     {
@@ -73,7 +72,7 @@ public class AMQFrame extends AMQDataBlo
     @Override
     public long writePayload(final ByteBufferSender sender) throws IOException
     {
-        QpidByteBuffer frameHeader = QpidByteBuffer.allocate(7);
+        QpidByteBuffer frameHeader = QpidByteBuffer.allocate(HEADER_SIZE);
 
         frameHeader.put(_bodyFrame.getFrameType());
         EncodingUtils.writeUnsignedShort(frameHeader, _channel);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Mon Aug 17 15:31:57 2015
@@ -95,10 +95,7 @@ public class BasicContentHeaderPropertie
     {
         if(other._headers != null)
         {
-            byte[] encodedHeaders = other._headers.getDataAsBytes();
-
-            _headers = new FieldTable(encodedHeaders,0,encodedHeaders.length);
-
+            _headers = new FieldTable(other._headers);
         }
 
         _contentType = other._contentType;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Mon Aug 17 15:31:57 2015
@@ -38,6 +38,7 @@ public class ContentHeaderBody implement
 {
     public static final byte TYPE = 2;
     public static final int CLASS_ID =  60;
+    private static final int HEADER_SIZE = 14;
 
     private long _bodySize;
 
@@ -106,14 +107,14 @@ public class ContentHeaderBody implement
     @Override
     public long writePayload(final ByteBufferSender sender) throws IOException
     {
-        QpidByteBuffer data = QpidByteBuffer.allocate(14);
+        QpidByteBuffer data = QpidByteBuffer.allocate(HEADER_SIZE);
         EncodingUtils.writeUnsignedShort(data, CLASS_ID);
         EncodingUtils.writeUnsignedShort(data, 0);
         data.putLong(_bodySize);
         EncodingUtils.writeUnsignedShort(data, _properties.getPropertyFlags());
         data.flip();
         sender.send(data);
-        return 14 + _properties.writePropertyListPayload(sender);
+        return HEADER_SIZE + _properties.writePropertyListPayload(sender);
     }
 
     public void handle(final int channelId, final AMQVersionAwareProtocolSession session)

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1696307&r1=1696306&r2=1696307&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Mon Aug 17 15:31:57 2015
@@ -70,11 +70,18 @@ public class FieldTable
         _strictAMQP = strictAMQP;
     }
 
-    public FieldTable(byte[] encodedForm, int offset, int length)
+    public FieldTable(FieldTable other)
     {
-        this(QpidByteBuffer.wrap(encodedForm,offset,length));
+        _encodedForm = other._encodedForm;
+        _encodedSize = other._encodedSize;
+        _strictAMQP = other._strictAMQP;
+        if(other._properties != null)
+        {
+            _properties = new LinkedHashMap<>(other._properties);
+        }
     }
 
+
     public FieldTable(QpidByteBuffer buffer)
     {
         this();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org