You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/21 11:53:29 UTC
svn commit: r1696917 - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/apache/qpid/server/util/
common/src/main/java/org/apache/q...
Author: rgodfrey
Date: Fri Aug 21 09:53:29 2015
New Revision: 1696917
URL: http://svn.apache.org/r1696917
Log:
QPID-6662 : Improve performance by no longer using soft refs
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java (with props)
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Aug 21 09:53:29 2015
@@ -699,7 +699,7 @@ public abstract class AbstractBDBMessage
StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
if(storedMessage instanceof StoredBDBMessage)
{
- postActions.add(((StoredBDBMessage) storedMessage).store(txn));
+ ((StoredBDBMessage) storedMessage).store(txn);
}
}
@@ -973,37 +973,40 @@ public abstract class AbstractBDBMessage
}
- private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T>
+ private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
{
- public MessageDataSoftRef(final T metadata, Collection<QpidByteBuffer> data)
+ private T _metaData;
+ private volatile Collection<QpidByteBuffer> _data;
+
+ private MessageDataSoftRef(final T metaData, Collection<QpidByteBuffer> data)
{
- super(new MessageData<T>(metadata, data));
+ _metaData = metaData;
+ _data = data;
}
@Override
public T getMetaData()
{
- MessageData<T> ref = get();
- return ref == null ? null : ref.getMetaData();
+ return _metaData;
}
@Override
public Collection<QpidByteBuffer> getData()
{
- MessageData<T> ref = get();
-
- return ref == null ? null : ref.getData();
+ return _data;
}
@Override
public void setData(final Collection<QpidByteBuffer> data)
{
- MessageData<T> ref = get();
- if(ref != null)
- {
- ref.setData(data);
- }
+ _data = data;
+ }
+
+ public void clear()
+ {
+ _metaData = null;
+ _data = null;
}
@Override
@@ -1166,7 +1169,7 @@ public abstract class AbstractBDBMessage
return content;
}
- synchronized Runnable store(Transaction txn)
+ synchronized void store(Transaction txn)
{
if (!stored())
{
@@ -1180,43 +1183,12 @@ public abstract class AbstractBDBMessage
MessageDataRef<T> hardRef = _messageDataRef;
MessageDataSoftRef<T> messageDataSoftRef;
- MessageData<T> ref;
- do
- {
- messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
- ref = messageDataSoftRef.get();
- }
- while (ref == null);
- _messageDataRef = messageDataSoftRef;
+ messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
- class Pointer implements Runnable
- {
- private MessageData<T> _ref;
+ _messageDataRef = messageDataSoftRef;
- Pointer(final MessageData<T> ref)
- {
- _ref = ref;
- }
- @Override
- public void run()
- {
- _ref = null;
- }
- }
- return new Pointer(ref);
- }
- else
- {
- return new Runnable()
- {
-
- @Override
- public void run()
- {
- }
- };
}
}
@@ -1330,7 +1302,7 @@ public abstract class AbstractBDBMessage
@Override
public void run()
{
- _postCommitActions.add(storedMessage.store(_txn));
+ storedMessage.store(_txn);
_storeSizeIncrease += contentSize;
}
});
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Aug 21 09:53:29 2015
@@ -43,6 +43,7 @@ import org.apache.qpid.server.txn.LocalT
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.util.StateChangeListenerEntry;
public abstract class QueueEntryImpl implements QueueEntry
{
@@ -63,13 +64,13 @@ public abstract class QueueEntryImpl imp
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener<? super QueueEntry, State>> _stateChangeListeners;
+ private volatile StateChangeListenerEntry<? super QueueEntry, State> _stateChangeListeners;
private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+ AtomicReferenceFieldUpdater<QueueEntryImpl, StateChangeListenerEntry>
_listenersUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
+ (QueueEntryImpl.class, StateChangeListenerEntry.class, "_stateChangeListeners");
private static final
@@ -98,7 +99,7 @@ public abstract class QueueEntryImpl imp
public QueueEntryImpl(QueueEntryList queueEntryList)
{
- this(queueEntryList,null,Long.MIN_VALUE, null);
+ this(queueEntryList, null, Long.MIN_VALUE, null);
_state = DELETED_STATE;
}
@@ -221,7 +222,7 @@ public abstract class QueueEntryImpl imp
public boolean acquire(ConsumerImpl sub)
{
- final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState());
+ final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getLockedState());
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -414,9 +415,15 @@ public abstract class QueueEntryImpl imp
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener<? super QueueEntry, State> l : _stateChangeListeners)
+ StateChangeListenerEntry<? super QueueEntry, State> entry = _listenersUpdater.get(this);
+ while(entry != null)
{
- l.stateChanged(this, oldState, newState);
+ StateChangeListener<? super QueueEntry, State> l = entry.getListener();
+ if(l != null)
+ {
+ l.stateChanged(this, oldState, newState);
+ }
+ entry = entry.next();
}
}
@@ -499,24 +506,20 @@ public abstract class QueueEntryImpl imp
public void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener)
{
- Set<StateChangeListener<? super QueueEntry, State>> listeners = _stateChangeListeners;
- if(listeners == null)
+ StateChangeListenerEntry<? super QueueEntry, State> entry = new StateChangeListenerEntry<>(listener);
+ if(!_listenersUpdater.compareAndSet(this,null, entry))
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super QueueEntry, State>>());
- listeners = _stateChangeListeners;
+ _listenersUpdater.get(this).add(entry);
}
-
- listeners.add(listener);
}
public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener)
{
- Set<StateChangeListener<? super QueueEntry, State>> listeners = _stateChangeListeners;
- if(listeners != null)
+ StateChangeListenerEntry entry = _listenersUpdater.get(this);
+ if(entry != null)
{
- return listeners.remove(listener);
+ return entry.remove(listener);
}
-
return false;
}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java?rev=1696917&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java Fri Aug 21 09:53:29 2015
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+public class StateChangeListenerEntry<T, E extends Enum>
+{
+ private static final AtomicReferenceFieldUpdater<StateChangeListenerEntry, StateChangeListenerEntry> NEXT =
+ AtomicReferenceFieldUpdater.newUpdater(StateChangeListenerEntry.class, StateChangeListenerEntry.class, "_next");
+
+ private volatile StateChangeListenerEntry<T, E> _next;
+ private volatile StateChangeListener<T,E> _listener;
+
+ public StateChangeListenerEntry(final StateChangeListener<T, E> listener)
+ {
+ _listener = listener;
+ }
+
+ public StateChangeListener<T, E> getListener()
+ {
+ return _listener;
+ }
+
+ public StateChangeListenerEntry<T, E> next()
+ {
+ return (StateChangeListenerEntry<T, E>) NEXT.get(this);
+ }
+
+ public void add(StateChangeListener<T,E> listener)
+ {
+ add(new StateChangeListenerEntry<>(listener));
+ }
+
+ public void add(final StateChangeListenerEntry<T, E> entry)
+ {
+ if(!entry.equals(_listener) && !NEXT.compareAndSet(this, null, entry))
+ {
+ NEXT.get(this).add(entry);
+ }
+ }
+
+ public boolean remove(final StateChangeListener<T, E> listener)
+ {
+ if(listener.equals(_listener))
+ {
+ _listener = null;
+ return true;
+ }
+ else
+ {
+ final StateChangeListenerEntry<T, E> next = next();
+ if(next != null)
+ {
+ boolean returnVal = next.remove(listener);
+ StateChangeListenerEntry<T,E> nextButOne;
+ if(next._listener == null && (nextButOne = next.next()) != null)
+ {
+ NEXT.compareAndSet(this, next, nextButOne);
+ }
+ return returnVal;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Fri Aug 21 09:53:29 2015
@@ -32,8 +32,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
@@ -53,12 +53,15 @@ public final class QpidByteBuffer
QpidByteBuffer.class,
"_disposed");
+ private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
+
private ByteBuffer _buffer;
private final ByteBufferRef _ref;
@SuppressWarnings("unused")
private volatile int _disposed;
private static final ConcurrentMap<Integer, BufferPool> _pools = new ConcurrentHashMap<>();
+ private static final AtomicInteger _maxPooledBufferSize = new AtomicInteger();
QpidByteBuffer(ByteBufferRef ref)
{
@@ -461,6 +464,35 @@ public final class QpidByteBuffer
return new QpidByteBuffer(ref);
}
+ public static QpidByteBuffer allocateDirectFromPool(int size)
+ {
+ final int maxPooledBufferSize = _maxPooledBufferSize.get();
+ if(size > maxPooledBufferSize)
+ {
+ return allocateDirect(size);
+ }
+ else
+ {
+ QpidByteBuffer buf = _cachedBuffer.get();
+ if(buf == null || buf.remaining() < size)
+ {
+ if(buf != null)
+ {
+ buf.dispose();
+ }
+ buf = allocateDirect(maxPooledBufferSize);
+ }
+ QpidByteBuffer rVal = buf.view(0,size);
+ buf.position(buf.position()+size);
+
+ _cachedBuffer.set(buf.slice());
+ buf.dispose();
+ return rVal;
+
+ }
+
+ }
+
public ByteBuffer asByteBuffer()
{
@@ -539,13 +571,21 @@ public final class QpidByteBuffer
int currentPoolSize = pool.getSize();
if (maxPoolSize != currentPoolSize)
{
- LOGGER.debug("Resizing direct pool, bufferSize : {} maxPoolSize from : {} to : ",
+ LOGGER.debug("Resizing direct pool, bufferSize : {} maxPoolSize from : {} to : {}",
new Object[] {bufferSize, currentPoolSize, maxPoolSize});
}
pool.ensureSize(maxPoolSize);
}
else
{
+ int prevMax;
+ while((prevMax = _maxPooledBufferSize.get())<bufferSize)
+ {
+ if(_maxPooledBufferSize.compareAndSet(prevMax, bufferSize))
+ {
+ break;
+ }
+ }
LOGGER.debug("Created direct pool, bufferSize : {} maxPoolSize : {}", bufferSize, maxPoolSize);
}
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Aug 21 09:53:29 2015
@@ -72,14 +72,14 @@ public class AMQFrame extends AMQDataBlo
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
- QpidByteBuffer frameHeader = QpidByteBuffer.allocate(HEADER_SIZE);
+ QpidByteBuffer frameHeader = QpidByteBuffer.allocateDirectFromPool(HEADER_SIZE);
frameHeader.put(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(frameHeader, _channel);
EncodingUtils.writeUnsignedInteger(frameHeader, _bodyFrame.getSize());
frameHeader.flip();
sender.send(frameHeader);
-
+ frameHeader.dispose();
long size = 8 + _bodyFrame.writePayload(sender);
sender.send(FRAME_END_BYTE_BUFFER.duplicate());
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Aug 21 09:53:29 2015
@@ -120,11 +120,12 @@ public abstract class AMQMethodBodyImpl
{
final int size = getSize();
- QpidByteBuffer buf = QpidByteBuffer.allocate(size);
+ QpidByteBuffer buf = QpidByteBuffer.allocateDirectFromPool(size);
DataOutput dataOutput = buf.asDataOutput();
writePayload(dataOutput);
buf.flip();
sender.send(buf);
+ buf.dispose();
return size;
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1696917&r1=1696916&r2=1696917&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Aug 21 09:53:29 2015
@@ -107,13 +107,14 @@ public class ContentHeaderBody implement
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
- QpidByteBuffer data = QpidByteBuffer.allocate(HEADER_SIZE);
+ QpidByteBuffer data = QpidByteBuffer.allocateDirectFromPool(HEADER_SIZE);
EncodingUtils.writeUnsignedShort(data, CLASS_ID);
EncodingUtils.writeUnsignedShort(data, 0);
data.putLong(_bodySize);
EncodingUtils.writeUnsignedShort(data, _properties.getPropertyFlags());
data.flip();
sender.send(data);
+ data.dispose();
return HEADER_SIZE + _properties.writePropertyListPayload(sender);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org