You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/03/06 13:27:49 UTC

svn commit: r750871 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/QueueEntryImpl.java test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java

Author: ritchiem
Date: Fri Mar  6 12:27:49 2009
New Revision: 750871

URL: http://svn.apache.org/viewvc?rev=750871&view=rev
Log:
Style Changes

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=750871&r1=750870&r2=750871&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Mar  6 12:27:49 2009
@@ -20,26 +20,23 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.log4j.Logger;
 
-import java.util.Set;
 import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 public class QueueEntryImpl implements QueueEntry
 {
 
-    /**
-     * Used for debugging purposes.
-     */
+    /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
 
     private final SimpleQueueEntryList _queueEntryList;
@@ -53,27 +50,24 @@
     private volatile EntryState _state = AVAILABLE_STATE;
 
     private static final
-        AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
+    AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
             _stateUpdater =
-        AtomicReferenceFieldUpdater.newUpdater
-        (QueueEntryImpl.class, EntryState.class, "_state");
-
+            AtomicReferenceFieldUpdater.newUpdater
+                    (QueueEntryImpl.class, EntryState.class, "_state");
 
     private volatile Set<StateChangeListener> _stateChangeListeners;
 
     private static final
-        AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
-                _listenersUpdater =
-        AtomicReferenceFieldUpdater.newUpdater
-        (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
-
+    AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+            _listenersUpdater =
+            AtomicReferenceFieldUpdater.newUpdater
+                    (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
 
     private static final
-        AtomicLongFieldUpdater<QueueEntryImpl>
+    AtomicLongFieldUpdater<QueueEntryImpl>
             _entryIdUpdater =
-        AtomicLongFieldUpdater.newUpdater
-        (QueueEntryImpl.class, "_entryId");
-
+            AtomicLongFieldUpdater.newUpdater
+                    (QueueEntryImpl.class, "_entryId");
 
     private volatile long _entryId;
 
@@ -90,17 +84,15 @@
 
     private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
 
-
     QueueEntryImpl(SimpleQueueEntryList queueEntryList)
     {
-        this(queueEntryList,null,Long.MIN_VALUE);
+        this(queueEntryList, null, Long.MIN_VALUE);
         _state = DELETED_STATE;
     }
 
-
     public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
     {
-        this(queueEntryList,message);
+        this(queueEntryList, message);
 
         _entryIdUpdater.set(this, entryId);
     }
@@ -113,8 +105,8 @@
         {
             _messageId = message.getMessageId();
             _messageSize = message.getSize();
-            
-            if(message.isImmediate())
+
+            if (message.isImmediate())
             {
                 _flags |= IMMEDIATE;
             }
@@ -193,8 +185,8 @@
 
     private boolean acquire(final EntryState state)
     {
-        boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
-        if(acquired && _stateChangeListeners != null)
+        boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
+        if (acquired && _stateChangeListeners != null)
         {
             notifyStateChange(State.AVAILABLE, State.ACQUIRED);
         }
@@ -220,24 +212,23 @@
 
     public void release()
     {
-        _stateUpdater.set(this,AVAILABLE_STATE);
+        _stateUpdater.set(this, AVAILABLE_STATE);
     }
 
     public String debugIdentity()
     {
-        String entry="[State:"+_state.getState().name()+"]";
+        String entry = "[State:" + _state.getState().name() + "]";
         if (_message == null)
         {
-            return entry+"(Message Unloaded ID:" + _messageId +")";
+            return entry + "(Message Unloaded ID:" + _messageId + ")";
         }
         else
         {
-            return entry+_message.debugIdentity();
+            return entry + _message.debugIdentity();
         }
     }
 
-
-    public boolean immediateAndNotDelivered() 
+    public boolean immediateAndNotDelivered()
     {
         return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
     }
@@ -266,15 +257,15 @@
 
     public Subscription getDeliveredSubscription()
     {
-            EntryState state = _state;
-            if (state instanceof SubscriptionAcquiredState)
-            {
-                return ((SubscriptionAcquiredState) state).getSubscription();
-            }
-            else
-            {
-                return null;
-            }
+        EntryState state = _state;
+        if (state instanceof SubscriptionAcquiredState)
+        {
+            return ((SubscriptionAcquiredState) state).getSubscription();
+        }
+        else
+        {
+            return null;
+        }
 
     }
 
@@ -301,7 +292,7 @@
     }
 
     public boolean isRejectedBy(Subscription subscription)
-    {        
+    {
 
         if (_rejectedBy != null) // We have subscriptions that rejected this message
         {
@@ -313,11 +304,10 @@
         }
     }
 
-
     public void requeue(final StoreContext storeContext) throws AMQException
     {
         getQueue().requeue(storeContext, this);
-        if(_stateChangeListeners != null)
+        if (_stateChangeListeners != null)
         {
             notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
         }
@@ -327,7 +317,7 @@
     {
         EntryState state = _state;
 
-        if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+        if ((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
         {
             if (state instanceof SubscriptionAcquiredState)
             {
@@ -348,7 +338,7 @@
 
     private void notifyStateChange(final State oldState, final State newState)
     {
-        for(StateChangeListener l : _stateChangeListeners)
+        for (StateChangeListener l : _stateChangeListeners)
         {
             l.stateChanged(this, oldState, newState);
         }
@@ -373,7 +363,7 @@
     public void addStateChangeListener(StateChangeListener listener)
     {
         Set<StateChangeListener> listeners = _stateChangeListeners;
-        if(listeners == null)
+        if (listeners == null)
         {
             _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
             listeners = _stateChangeListeners;
@@ -385,7 +375,7 @@
     public boolean removeStateChangeListener(StateChangeListener listener)
     {
         Set<StateChangeListener> listeners = _stateChangeListeners;
-        if(listeners != null)
+        if (listeners != null)
         {
             return listeners.remove(listener);
         }
@@ -402,21 +392,23 @@
             {
                 _backingStore.unload(_message);
 
-                if(_log.isDebugEnabled())
+                if (_log.isDebugEnabled())
                 {
                     _log.debug("Unloaded:" + debugIdentity());
                 }
-
                 _message = null;
-                //Update the memoryState if this load call resulted in the message being purged from memory                
+
+                //Update the memoryState if this load call resulted in the message being purged from memory
                 if (!_flowed.getAndSet(true))
                 {
                     _queueEntryList.entryUnloadedUpdateMemory(this);
                 }
 
-            } catch (UnableToFlowMessageException utfme) {
+            }
+            catch (UnableToFlowMessageException utfme)
+            {
                 // There is no recovery needed as the memory states remain unchanged.
-                if(_log.isDebugEnabled())
+                if (_log.isDebugEnabled())
                 {
                     _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage());
                 }
@@ -430,7 +422,7 @@
         {
             _message = _backingStore.load(_messageId);
 
-            if(_log.isDebugEnabled())
+            if (_log.isDebugEnabled())
             {
                 _log.debug("Loaded:" + debugIdentity());
             }
@@ -448,10 +440,9 @@
         return _flowed.get();
     }
 
-
     public int compareTo(final QueueEntry o)
     {
-        QueueEntryImpl other = (QueueEntryImpl)o;
+        QueueEntryImpl other = (QueueEntryImpl) o;
         return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
     }
 
@@ -459,13 +450,13 @@
     {
 
         QueueEntryImpl next = nextNode();
-        while(next != null && next.isDeleted())
+        while (next != null && next.isDeleted())
         {
 
             final QueueEntryImpl newNext = next.nextNode();
-            if(newNext != null)
+            if (newNext != null)
             {
-                SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+                SimpleQueueEntryList._nextUpdater.compareAndSet(this, next, newNext);
                 next = nextNode();
             }
             else
@@ -491,7 +482,7 @@
     {
         EntryState state = _state;
 
-        if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
+        if (state != DELETED_STATE && _stateUpdater.compareAndSet(this, state, DELETED_STATE))
         {
             _queueEntryList.advanceHead();
             if (_backingStore != null)

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=750871&r1=750870&r2=750871&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Mar  6 12:27:49 2009
@@ -415,7 +415,7 @@
 
         //Check the queue is still within it's limits.
         long current = _queue.getMemoryUsageCurrent();
-        assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() ,
+        assertTrue("Queue has gone over quota:" + current + "/" + _queue.getMemoryUsageMaximum(),
                    current <= _queue.getMemoryUsageMaximum());
 
         assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
@@ -428,14 +428,14 @@
         }
     }
 
-      public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+    public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
     {
         // Create IncomingMessage and nondurable queue
         NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
 
         MESSAGE_SIZE = 1;
         /** Set to larger than the purge batch size. Default 100.
-         * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */ 
+         * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
         long MEMORY_MAX = 500;
         int MESSAGE_COUNT = (int) MEMORY_MAX;
         //Set the Memory Usage to be very low
@@ -454,7 +454,7 @@
         // Send anothe and ensure we are flowed
         sendMessage(txnContext);
         assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
-        assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent());
+        assertEquals(MESSAGE_COUNT, _queue.getMemoryUsageCurrent());
         assertTrue("Queue is not flowed.", _queue.isFlowed());
 
         _queue.setMemoryUsageMaximum(0L);
@@ -469,7 +469,7 @@
         }
 
         assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
-        assertEquals(0L , _queue.getMemoryUsageCurrent());
+        assertEquals(0L, _queue.getMemoryUsageCurrent());
         assertTrue("Queue is not flowed.", _queue.isFlowed());
 
     }
@@ -511,6 +511,7 @@
         assertNotNull(data);
     }
 
+
     // FIXME: move this to somewhere useful
     private static AMQMessage createMessage(final MessagePublishInfo publishBody)
     {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org