You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/21 11:53:29 UTC

svn commit: r1696917 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/util/ common/src/main/java/org/apache/q...

Author: rgodfrey
Date: Fri Aug 21 09:53:29 2015
New Revision: 1696917

URL: http://svn.apache.org/r1696917
Log:
QPID-6662 : Improve performance by no longer using soft refs

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java   (with props)
Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Aug 21 09:53:29 2015
@@ -699,7 +699,7 @@ public abstract class AbstractBDBMessage
             StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
             if(storedMessage instanceof StoredBDBMessage)
             {
-                postActions.add(((StoredBDBMessage) storedMessage).store(txn));
+                ((StoredBDBMessage) storedMessage).store(txn);
             }
         }
 
@@ -973,37 +973,40 @@ public abstract class AbstractBDBMessage
 
 
     }
-    private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T>
+    private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
     {
 
-        public MessageDataSoftRef(final T metadata, Collection<QpidByteBuffer> data)
+        private T _metaData;
+        private volatile Collection<QpidByteBuffer> _data;
+
+        private MessageDataSoftRef(final T metaData, Collection<QpidByteBuffer> data)
         {
-            super(new MessageData<T>(metadata, data));
+            _metaData = metaData;
+            _data = data;
         }
 
         @Override
         public T getMetaData()
         {
-            MessageData<T> ref = get();
-            return ref == null ? null : ref.getMetaData();
+            return _metaData;
         }
 
         @Override
         public Collection<QpidByteBuffer> getData()
         {
-            MessageData<T> ref = get();
-
-            return ref == null ? null : ref.getData();
+            return _data;
         }
 
         @Override
         public void setData(final Collection<QpidByteBuffer> data)
         {
-            MessageData<T> ref = get();
-            if(ref != null)
-            {
-                ref.setData(data);
-            }
+            _data = data;
+        }
+
+        public void clear()
+        {
+            _metaData = null;
+            _data = null;
         }
 
         @Override
@@ -1166,7 +1169,7 @@ public abstract class AbstractBDBMessage
             return content;
         }
 
-        synchronized Runnable store(Transaction txn)
+        synchronized void store(Transaction txn)
         {
             if (!stored())
             {
@@ -1180,43 +1183,12 @@ public abstract class AbstractBDBMessage
 
                 MessageDataRef<T> hardRef = _messageDataRef;
                 MessageDataSoftRef<T> messageDataSoftRef;
-                MessageData<T> ref;
-                do
-                {
-                    messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
-                    ref = messageDataSoftRef.get();
-                }
-                while (ref == null);
 
-                _messageDataRef = messageDataSoftRef;
+                messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
 
-                class Pointer implements Runnable
-                {
-                    private MessageData<T> _ref;
+                _messageDataRef = messageDataSoftRef;
 
-                    Pointer(final MessageData<T> ref)
-                    {
-                        _ref = ref;
-                    }
 
-                    @Override
-                    public void run()
-                    {
-                        _ref = null;
-                    }
-                }
-                return new Pointer(ref);
-            }
-            else
-            {
-                return new Runnable()
-                {
-
-                    @Override
-                    public void run()
-                    {
-                    }
-                };
             }
         }
 
@@ -1330,7 +1302,7 @@ public abstract class AbstractBDBMessage
                     @Override
                     public void run()
                     {
-                        _postCommitActions.add(storedMessage.store(_txn));
+                        storedMessage.store(_txn);
                         _storeSizeIncrease += contentSize;
                     }
                 });

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Aug 21 09:53:29 2015
@@ -43,6 +43,7 @@ import org.apache.qpid.server.txn.LocalT
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.util.StateChangeListenerEntry;
 
 public abstract class QueueEntryImpl implements QueueEntry
 {
@@ -63,13 +64,13 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile Set<StateChangeListener<? super QueueEntry, State>> _stateChangeListeners;
+    private volatile StateChangeListenerEntry<? super QueueEntry, State> _stateChangeListeners;
 
     private static final
-        AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+        AtomicReferenceFieldUpdater<QueueEntryImpl, StateChangeListenerEntry>
                 _listenersUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
+        (QueueEntryImpl.class, StateChangeListenerEntry.class, "_stateChangeListeners");
 
 
     private static final
@@ -98,7 +99,7 @@ public abstract class QueueEntryImpl imp
 
     public QueueEntryImpl(QueueEntryList queueEntryList)
     {
-        this(queueEntryList,null,Long.MIN_VALUE, null);
+        this(queueEntryList, null, Long.MIN_VALUE, null);
         _state = DELETED_STATE;
     }
 
@@ -221,7 +222,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean acquire(ConsumerImpl sub)
     {
-        final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState());
+        final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getLockedState());
         if(acquired)
         {
             _deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -414,9 +415,15 @@ public abstract class QueueEntryImpl imp
 
     private void notifyStateChange(final State oldState, final State newState)
     {
-        for(StateChangeListener<? super QueueEntry, State> l : _stateChangeListeners)
+        StateChangeListenerEntry<? super QueueEntry, State> entry = _listenersUpdater.get(this);
+        while(entry != null)
         {
-            l.stateChanged(this, oldState, newState);
+            StateChangeListener<? super QueueEntry, State> l = entry.getListener();
+            if(l != null)
+            {
+                l.stateChanged(this, oldState, newState);
+            }
+            entry = entry.next();
         }
     }
 
@@ -499,24 +506,20 @@ public abstract class QueueEntryImpl imp
 
     public void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener)
     {
-        Set<StateChangeListener<? super QueueEntry, State>> listeners = _stateChangeListeners;
-        if(listeners == null)
+        StateChangeListenerEntry<? super QueueEntry, State> entry = new StateChangeListenerEntry<>(listener);
+        if(!_listenersUpdater.compareAndSet(this,null, entry))
         {
-            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super QueueEntry, State>>());
-            listeners = _stateChangeListeners;
+            _listenersUpdater.get(this).add(entry);
         }
-
-        listeners.add(listener);
     }
 
     public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener)
     {
-        Set<StateChangeListener<? super QueueEntry, State>> listeners = _stateChangeListeners;
-        if(listeners != null)
+        StateChangeListenerEntry entry = _listenersUpdater.get(this);
+        if(entry != null)
         {
-            return listeners.remove(listener);
+            return entry.remove(listener);
         }
-
         return false;
     }
 

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java?rev=1696917&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java Fri Aug 21 09:53:29 2015
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+public class StateChangeListenerEntry<T, E extends Enum>
+{
+    private static final AtomicReferenceFieldUpdater<StateChangeListenerEntry, StateChangeListenerEntry> NEXT =
+            AtomicReferenceFieldUpdater.newUpdater(StateChangeListenerEntry.class, StateChangeListenerEntry.class, "_next");
+
+    private volatile StateChangeListenerEntry<T, E> _next;
+    private volatile StateChangeListener<T,E> _listener;
+
+    public StateChangeListenerEntry(final StateChangeListener<T, E> listener)
+    {
+        _listener = listener;
+    }
+
+    public StateChangeListener<T, E> getListener()
+    {
+        return _listener;
+    }
+
+    public StateChangeListenerEntry<T, E> next()
+    {
+        return (StateChangeListenerEntry<T, E>) NEXT.get(this);
+    }
+
+    public void add(StateChangeListener<T,E> listener)
+    {
+        add(new StateChangeListenerEntry<>(listener));
+    }
+
+    public void add(final StateChangeListenerEntry<T, E> entry)
+    {
+        if(!entry.equals(_listener) && !NEXT.compareAndSet(this, null, entry))
+        {
+            NEXT.get(this).add(entry);
+        }
+    }
+
+    public boolean remove(final StateChangeListener<T, E> listener)
+    {
+        if(listener.equals(_listener))
+        {
+            _listener = null;
+            return true;
+        }
+        else
+        {
+            final StateChangeListenerEntry<T, E> next = next();
+            if(next != null)
+            {
+                boolean returnVal = next.remove(listener);
+                StateChangeListenerEntry<T,E> nextButOne;
+                if(next._listener == null && (nextButOne = next.next()) != null)
+                {
+                    NEXT.compareAndSet(this, next, nextButOne);
+                }
+                return returnVal;
+            }
+            else
+            {
+                return false;
+            }
+        }
+    }
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Fri Aug 21 09:53:29 2015
@@ -32,8 +32,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
@@ -53,12 +53,15 @@ public final class QpidByteBuffer
             QpidByteBuffer.class,
             "_disposed");
 
+    private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
+
     private ByteBuffer _buffer;
     private final ByteBufferRef _ref;
     @SuppressWarnings("unused")
     private volatile int _disposed;
 
     private static final ConcurrentMap<Integer, BufferPool> _pools = new ConcurrentHashMap<>();
+    private static final AtomicInteger _maxPooledBufferSize = new AtomicInteger();
 
     QpidByteBuffer(ByteBufferRef ref)
     {
@@ -461,6 +464,35 @@ public final class QpidByteBuffer
         return new QpidByteBuffer(ref);
     }
 
+    public static QpidByteBuffer allocateDirectFromPool(int size)
+    {
+        final int maxPooledBufferSize = _maxPooledBufferSize.get();
+        if(size > maxPooledBufferSize)
+        {
+            return allocateDirect(size);
+        }
+        else
+        {
+            QpidByteBuffer buf = _cachedBuffer.get();
+            if(buf == null || buf.remaining() < size)
+            {
+                if(buf != null)
+                {
+                    buf.dispose();
+                }
+                buf = allocateDirect(maxPooledBufferSize);
+            }
+            QpidByteBuffer rVal = buf.view(0,size);
+            buf.position(buf.position()+size);
+
+            _cachedBuffer.set(buf.slice());
+            buf.dispose();
+            return rVal;
+
+        }
+
+    }
+
 
     public ByteBuffer asByteBuffer()
     {
@@ -539,13 +571,21 @@ public final class QpidByteBuffer
             int currentPoolSize = pool.getSize();
             if (maxPoolSize != currentPoolSize)
             {
-                LOGGER.debug("Resizing direct pool, bufferSize : {} maxPoolSize from : {} to : ",
+                LOGGER.debug("Resizing direct pool, bufferSize : {} maxPoolSize from : {} to : {}",
                     new Object[] {bufferSize, currentPoolSize, maxPoolSize});
             }
             pool.ensureSize(maxPoolSize);
         }
         else
         {
+            int prevMax;
+            while((prevMax = _maxPooledBufferSize.get())<bufferSize)
+            {
+                if(_maxPooledBufferSize.compareAndSet(prevMax, bufferSize))
+                {
+                    break;
+                }
+            }
             LOGGER.debug("Created direct pool, bufferSize : {} maxPoolSize : {}", bufferSize, maxPoolSize);
         }
     }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Aug 21 09:53:29 2015
@@ -72,14 +72,14 @@ public class AMQFrame extends AMQDataBlo
     @Override
     public long writePayload(final ByteBufferSender sender) throws IOException
     {
-        QpidByteBuffer frameHeader = QpidByteBuffer.allocate(HEADER_SIZE);
+        QpidByteBuffer frameHeader = QpidByteBuffer.allocateDirectFromPool(HEADER_SIZE);
 
         frameHeader.put(_bodyFrame.getFrameType());
         EncodingUtils.writeUnsignedShort(frameHeader, _channel);
         EncodingUtils.writeUnsignedInteger(frameHeader, _bodyFrame.getSize());
         frameHeader.flip();
         sender.send(frameHeader);
-
+        frameHeader.dispose();
         long size = 8 + _bodyFrame.writePayload(sender);
 
         sender.send(FRAME_END_BYTE_BUFFER.duplicate());

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Aug 21 09:53:29 2015
@@ -120,11 +120,12 @@ public abstract class AMQMethodBodyImpl
     {
 
         final int size = getSize();
-        QpidByteBuffer buf = QpidByteBuffer.allocate(size);
+        QpidByteBuffer buf = QpidByteBuffer.allocateDirectFromPool(size);
         DataOutput dataOutput = buf.asDataOutput();
         writePayload(dataOutput);
         buf.flip();
         sender.send(buf);
+        buf.dispose();
         return size;
     }
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Aug 21 09:53:29 2015
@@ -107,13 +107,14 @@ public class ContentHeaderBody implement
     @Override
     public long writePayload(final ByteBufferSender sender) throws IOException
     {
-        QpidByteBuffer data = QpidByteBuffer.allocate(HEADER_SIZE);
+        QpidByteBuffer data = QpidByteBuffer.allocateDirectFromPool(HEADER_SIZE);
         EncodingUtils.writeUnsignedShort(data, CLASS_ID);
         EncodingUtils.writeUnsignedShort(data, 0);
         data.putLong(_bodySize);
         EncodingUtils.writeUnsignedShort(data, _properties.getPropertyFlags());
         data.flip();
         sender.send(data);
+        data.dispose();
         return HEADER_SIZE + _properties.writePropertyListPayload(sender);
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org