You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/03/06 13:27:49 UTC
svn commit: r750871 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
Author: ritchiem
Date: Fri Mar 6 12:27:49 2009
New Revision: 750871
URL: http://svn.apache.org/viewvc?rev=750871&view=rev
Log:
Style Changes
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=750871&r1=750870&r2=750871&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Mar 6 12:27:49 2009
@@ -20,26 +20,23 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.log4j.Logger;
-import java.util.Set;
import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class QueueEntryImpl implements QueueEntry
{
- /**
- * Used for debugging purposes.
- */
+ /** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
private final SimpleQueueEntryList _queueEntryList;
@@ -53,27 +50,24 @@
private volatile EntryState _state = AVAILABLE_STATE;
private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
+ AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
_stateUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, EntryState.class, "_state");
-
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, EntryState.class, "_state");
private volatile Set<StateChangeListener> _stateChangeListeners;
private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
- _listenersUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
-
+ AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+ _listenersUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
private static final
- AtomicLongFieldUpdater<QueueEntryImpl>
+ AtomicLongFieldUpdater<QueueEntryImpl>
_entryIdUpdater =
- AtomicLongFieldUpdater.newUpdater
- (QueueEntryImpl.class, "_entryId");
-
+ AtomicLongFieldUpdater.newUpdater
+ (QueueEntryImpl.class, "_entryId");
private volatile long _entryId;
@@ -90,17 +84,15 @@
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
- this(queueEntryList,null,Long.MIN_VALUE);
+ this(queueEntryList, null, Long.MIN_VALUE);
_state = DELETED_STATE;
}
-
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
{
- this(queueEntryList,message);
+ this(queueEntryList, message);
_entryIdUpdater.set(this, entryId);
}
@@ -113,8 +105,8 @@
{
_messageId = message.getMessageId();
_messageSize = message.getSize();
-
- if(message.isImmediate())
+
+ if (message.isImmediate())
{
_flags |= IMMEDIATE;
}
@@ -193,8 +185,8 @@
private boolean acquire(final EntryState state)
{
- boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
- if(acquired && _stateChangeListeners != null)
+ boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
+ if (acquired && _stateChangeListeners != null)
{
notifyStateChange(State.AVAILABLE, State.ACQUIRED);
}
@@ -220,24 +212,23 @@
public void release()
{
- _stateUpdater.set(this,AVAILABLE_STATE);
+ _stateUpdater.set(this, AVAILABLE_STATE);
}
public String debugIdentity()
{
- String entry="[State:"+_state.getState().name()+"]";
+ String entry = "[State:" + _state.getState().name() + "]";
if (_message == null)
{
- return entry+"(Message Unloaded ID:" + _messageId +")";
+ return entry + "(Message Unloaded ID:" + _messageId + ")";
}
else
{
- return entry+_message.debugIdentity();
+ return entry + _message.debugIdentity();
}
}
-
- public boolean immediateAndNotDelivered()
+ public boolean immediateAndNotDelivered()
{
return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
}
@@ -266,15 +257,15 @@
public Subscription getDeliveredSubscription()
{
- EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
- {
- return ((SubscriptionAcquiredState) state).getSubscription();
- }
- else
- {
- return null;
- }
+ EntryState state = _state;
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ return ((SubscriptionAcquiredState) state).getSubscription();
+ }
+ else
+ {
+ return null;
+ }
}
@@ -301,7 +292,7 @@
}
public boolean isRejectedBy(Subscription subscription)
- {
+ {
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
@@ -313,11 +304,10 @@
}
}
-
public void requeue(final StoreContext storeContext) throws AMQException
{
getQueue().requeue(storeContext, this);
- if(_stateChangeListeners != null)
+ if (_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
}
@@ -327,7 +317,7 @@
{
EntryState state = _state;
- if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+ if ((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
if (state instanceof SubscriptionAcquiredState)
{
@@ -348,7 +338,7 @@
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener l : _stateChangeListeners)
+ for (StateChangeListener l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -373,7 +363,7 @@
public void addStateChangeListener(StateChangeListener listener)
{
Set<StateChangeListener> listeners = _stateChangeListeners;
- if(listeners == null)
+ if (listeners == null)
{
_listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
listeners = _stateChangeListeners;
@@ -385,7 +375,7 @@
public boolean removeStateChangeListener(StateChangeListener listener)
{
Set<StateChangeListener> listeners = _stateChangeListeners;
- if(listeners != null)
+ if (listeners != null)
{
return listeners.remove(listener);
}
@@ -402,21 +392,23 @@
{
_backingStore.unload(_message);
- if(_log.isDebugEnabled())
+ if (_log.isDebugEnabled())
{
_log.debug("Unloaded:" + debugIdentity());
}
-
_message = null;
- //Update the memoryState if this load call resulted in the message being purged from memory
+
+ //Update the memoryState if this load call resulted in the message being purged from memory
if (!_flowed.getAndSet(true))
{
_queueEntryList.entryUnloadedUpdateMemory(this);
}
- } catch (UnableToFlowMessageException utfme) {
+ }
+ catch (UnableToFlowMessageException utfme)
+ {
// There is no recovery needed as the memory states remain unchanged.
- if(_log.isDebugEnabled())
+ if (_log.isDebugEnabled())
{
_log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage());
}
@@ -430,7 +422,7 @@
{
_message = _backingStore.load(_messageId);
- if(_log.isDebugEnabled())
+ if (_log.isDebugEnabled())
{
_log.debug("Loaded:" + debugIdentity());
}
@@ -448,10 +440,9 @@
return _flowed.get();
}
-
public int compareTo(final QueueEntry o)
{
- QueueEntryImpl other = (QueueEntryImpl)o;
+ QueueEntryImpl other = (QueueEntryImpl) o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
@@ -459,13 +450,13 @@
{
QueueEntryImpl next = nextNode();
- while(next != null && next.isDeleted())
+ while (next != null && next.isDeleted())
{
final QueueEntryImpl newNext = next.nextNode();
- if(newNext != null)
+ if (newNext != null)
{
- SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ SimpleQueueEntryList._nextUpdater.compareAndSet(this, next, newNext);
next = nextNode();
}
else
@@ -491,7 +482,7 @@
{
EntryState state = _state;
- if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
+ if (state != DELETED_STATE && _stateUpdater.compareAndSet(this, state, DELETED_STATE))
{
_queueEntryList.advanceHead();
if (_backingStore != null)
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=750871&r1=750870&r2=750871&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Mar 6 12:27:49 2009
@@ -415,7 +415,7 @@
//Check the queue is still within it's limits.
long current = _queue.getMemoryUsageCurrent();
- assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() ,
+ assertTrue("Queue has gone over quota:" + current + "/" + _queue.getMemoryUsageMaximum(),
current <= _queue.getMemoryUsageMaximum());
assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
@@ -428,14 +428,14 @@
}
}
- public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+ public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
{
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
MESSAGE_SIZE = 1;
/** Set to larger than the purge batch size. Default 100.
- * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
+ * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
long MEMORY_MAX = 500;
int MESSAGE_COUNT = (int) MEMORY_MAX;
//Set the Memory Usage to be very low
@@ -454,7 +454,7 @@
// Send anothe and ensure we are flowed
sendMessage(txnContext);
assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
- assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
_queue.setMemoryUsageMaximum(0L);
@@ -469,7 +469,7 @@
}
assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
- assertEquals(0L , _queue.getMemoryUsageCurrent());
+ assertEquals(0L, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
}
@@ -511,6 +511,7 @@
assertNotNull(data);
}
+
// FIXME: move this to somewhere useful
private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org