You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/14 15:41:15 UTC

svn commit: r774764 [1/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/act...

Author: chirino
Date: Thu May 14 13:41:14 2009
New Revision: 774764

URL: http://svn.apache.org/viewvc?rev=774764&view=rev
Log:
Applying https://issues.apache.org/activemq/browse/AMQ-2251 patch.. Thanks Colin!

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu May 14 13:41:14 2009
@@ -34,10 +34,10 @@
     protected String name;
 
     private int priorityLevels;
-    protected int outputWindowSize = 1000;
-    protected int outputResumeThreshold = 900;
-    protected int inputWindowSize = 1000;
-    protected int inputResumeThreshold = 500;
+    protected int outputWindowSize = 1024 * 1024;
+    protected int outputResumeThreshold = 900 * 1024;
+    protected int inputWindowSize = 1024 * 1024;
+    protected int inputResumeThreshold = 512 * 1024;
     protected boolean useAsyncWriteThread = true;
     
     private IDispatcher dispatcher;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Thu May 14 13:41:14 2009
@@ -17,6 +17,8 @@
 package org.apache.activemq.broker;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.Map.Entry;
@@ -41,6 +43,7 @@
     // List of persistent targets for which the message should be saved
     // when dispatch is complete:
     HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
+    SaveableQueueElement<MessageDelivery> singleTarget;
 
     long storeTracking = -1;
     BrokerDatabase store;
@@ -74,7 +77,7 @@
         return fromStore;
     }
 
-    public final void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable){
+    public final void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
         synchronized (this) {
             // Can flush of this message to the store be delayed?
             if (enableFlushDelay && !delayable) {
@@ -84,16 +87,13 @@
             // list of queues for which to save the message when dispatch is
             // finished:
             if (dispatching) {
-                if (persistentTargets == null) {
-                    persistentTargets = new HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
-                }
-                persistentTargets.put(elem.getQueueDescriptor(), elem);
+                addPersistentTarget(elem);
                 return;
             }
             // Otherwise, if it is still in the saver queue, we can add this
             // queue to the queue list:
             else if (pendingSave != null) {
-                persistentTargets.put(elem.getQueueDescriptor(), elem);
+                addPersistentTarget(elem);
                 if (!delayable) {
                     pendingSave.requestFlush();
                 }
@@ -112,15 +112,14 @@
             // then we don't need to issue a delete:
             if (dispatching || pendingSave != null) {
 
-                // Remove the queue:
-                persistentTargets.remove(queue);
                 deleted = true;
 
+                removePersistentTarget(queue);
                 // We get a save context when we place the message in the
                 // database queue. If it has been added to the queue,
                 // and we've removed the last queue, see if we can cancel
                 // the save:
-                if (pendingSave != null && persistentTargets.isEmpty()) {
+                if (pendingSave != null && !hasPersistentTargets()) {
                     if (pendingSave.cancel()) {
                         pendingSave = null;
                         if (isPersistent()) {
@@ -153,8 +152,15 @@
         return storeTracking;
     }
 
-    public Set<Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>>> getPersistentQueues() {
-        return persistentTargets.entrySet();
+    public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
+        if (singleTarget != null) {
+            ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
+            list.add(singleTarget);
+            return list;
+        } else if (persistentTargets != null) {
+            return persistentTargets.values();
+        }
+        return null;
     }
 
     public void beginStore() {
@@ -163,6 +169,40 @@
         }
     }
 
+    private final boolean hasPersistentTargets() {
+        return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
+    }
+
+    private final void removePersistentTarget(QueueDescriptor queue) {
+        if (persistentTargets != null) {
+            persistentTargets.remove(queue);
+            return;
+        }
+
+        if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
+            singleTarget = null;
+        }
+    }
+
+    private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
+        if (persistentTargets != null) {
+            persistentTargets.put(elem.getQueueDescriptor(), elem);
+            return;
+        }
+
+        if (singleTarget == null) {
+            singleTarget = elem;
+            return;
+        }
+
+        if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
+            persistentTargets = new HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
+            persistentTargets.put(elem.getQueueDescriptor(), elem);
+            persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
+            singleTarget = null;
+        }
+    }
+
     public void finishDispatch(ISourceController<?> controller) throws IOException {
         boolean firePersistListener = false;
         synchronized (this) {
@@ -170,7 +210,7 @@
             // Note that this could be the case even if the message isn't
             // persistent if a target requested that the message be spooled
             // for some other reason such as queue memory overflow.
-            if (persistentTargets != null && !persistentTargets.isEmpty()) {
+            if (hasPersistentTargets()) {
                 pendingSave = store.persistReceivedMessage(this, controller);
             }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Thu May 14 13:41:14 2009
@@ -58,6 +58,12 @@
     // subscriberQueues = new HashMap<String, IFlowQueue<MessageDelivery>>();
 
     private Mapper<Integer, MessageDelivery> partitionMapper;
+
+    private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;;
+    private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
+    // Be default we don't page out elements to disk.
+    private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+    //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
     
     private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
         public Long map(MessageDelivery element) {
@@ -65,17 +71,41 @@
         }
     };
 
+    private static final Mapper<Integer, MessageDelivery> SIZE_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            return element.getFlowLimiterSize();
+        }
+    };
+
     private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
         public boolean isPersistent(MessageDelivery elem) {
             return elem.isPersistent();
         }
 
         public boolean isPageOutPlaceHolders() {
-            return false;
+            return true;
         }
 
         public boolean isPagingEnabled() {
-            return false;
+            return DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+        }
+
+        public int getPagingInMemorySize() {
+            return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+        }
+
+        public int getDisconnectedThrottleRate() {
+            // By default don't throttle consumers when disconnected.
+            return 0;
+        }
+
+        public boolean isThrottleSourcesToMemoryLimit() {
+            // Keep the queue in memory.
+            return true;
+        }
+
+        public int getRecoveryBias() {
+            return 8;
         }
     };
 
@@ -194,8 +224,9 @@
             break;
         }
         case QueueDescriptor.SHARED_PRIORITY: {
-            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100, 1, MessageBroker.MAX_PRIORITY);
+            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, MessageBroker.MAX_PRIORITY);
             limiter.setPriorityMapper(PRIORITY_MAPPER);
+            limiter.setSizeMapper(SIZE_MAPPER);
             SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
             ret = queue;
             queue.setKeyMapper(KEY_MAPPER);
@@ -203,7 +234,13 @@
             break;
         }
         case QueueDescriptor.SHARED: {
-            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 1);
+            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD){
+                @Override
+                public int getElementSize(MessageDelivery elem) {
+                    return elem.getFlowLimiterSize();
+                }
+            };
+            
             if (!USE_OLD_QUEUE) {
                 SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
                 sQueue.setKeyMapper(KEY_MAPPER);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Thu May 14 13:41:14 2009
@@ -59,8 +59,7 @@
     }
 
     public int getMemorySize() {
-        //return size;
-        return 1;
+        return size;
     }
 
     public int getPriority() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu May 14 13:41:14 2009
@@ -442,7 +442,7 @@
             Flow flow = new Flow("broker-" + name + "-outbound", false);
             limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
                 public int getElementSize(MessageDelivery m) {
-                    return 1;
+                    return m.getFlowLimiterSize();
                 }
             };
             queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Thu May 14 13:41:14 2009
@@ -310,7 +310,7 @@
                 Flow flow = new Flow("broker-" + subscriptionId + "-outbound", false);
                 limiter = new WindowLimiter<MessageDelivery>(true, flow, 1000, 500) {
                     public int getElementSize(MessageDelivery m) {
-                        return 1;
+                        return m.getFlowLimiterSize();
                     }
                 };
                 queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Thu May 14 13:41:14 2009
@@ -22,8 +22,6 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.Set;
-import java.util.Map.Entry;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -64,6 +62,7 @@
 
     private final SizeLimiter<OperationBase> storeLimiter;
     private final FlowController<OperationBase> storeController;
+    private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
 
     private final IDispatcher dispatcher;
     private Thread flushThread;
@@ -82,7 +81,7 @@
     // num scheduled for delay
     private long delayedFlushPointer = 0; // The last delayable sequence num
     // requested.
-    private final long FLUSH_DELAY_MS = 5;
+    private final long FLUSH_DELAY_MS = 10;
     private final Runnable flushDelayCallback;
 
     public interface DatabaseListener {
@@ -99,7 +98,7 @@
         this.store = store;
         this.dispatcher = dispatcher;
         this.opQueue = new LinkedNodeList<OperationBase>();
-        storeLimiter = new SizeLimiter<OperationBase>(10000, 5000) {
+        storeLimiter = new SizeLimiter<OperationBase>(FLUSH_QUEUE_SIZE, 0) {
 
             @Override
             public int getElementSize(OperationBase op) {
@@ -357,16 +356,17 @@
 
                     // If we procecessed some ops, flush and post process:
                     if (!processedQueue.isEmpty()) {
-                        
-                        //System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
-                        //Sync the store:
+
+                        if (DEBUG)
+                            System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
+                        // Sync the store:
                         store.flush();
-                        
 
                         // Post process operations
                         long release = 0;
                         for (Operation processed : processedQueue) {
                             processed.onCommit();
+                            // System.out.println("Processed" + processed);
                             release += processed.getLimiterSize();
                         }
 
@@ -619,6 +619,8 @@
         final protected AtomicBoolean cancelled = new AtomicBoolean(false);
         final protected AtomicBoolean executed = new AtomicBoolean(false);
 
+        public static final int BASE_MEM_SIZE = 20;
+
         public boolean cancel() {
             if (executePending.compareAndSet(true, false)) {
                 cancelled.set(true);
@@ -680,7 +682,7 @@
         abstract protected void doExcecute(Session session);
 
         public int getLimiterSize() {
-            return 0;
+            return BASE_MEM_SIZE;
         }
 
         public boolean isDelayable() {
@@ -722,12 +724,6 @@
         }
 
         @Override
-        public int getLimiterSize() {
-            // Might consider bumping this up to avoid too much accumulation?
-            return 0;
-        }
-
-        @Override
         protected void doExcecute(Session session) {
             try {
                 session.queueAdd(qd);
@@ -755,12 +751,6 @@
         }
 
         @Override
-        public int getLimiterSize() {
-            // Might consider bumping this up to avoid too much accumulation?
-            return 0;
-        }
-
-        @Override
         protected void doExcecute(Session session) {
             session.queueRemove(qd);
         }
@@ -787,7 +777,7 @@
         @Override
         public int getLimiterSize() {
             // Might consider bumping this up to avoid too much accumulation?
-            return 1;
+            return BASE_MEM_SIZE + 8;
         }
 
         @Override
@@ -832,6 +822,11 @@
         }
 
         @Override
+        public int getLimiterSize() {
+            return BASE_MEM_SIZE + 44;
+        }
+
+        @Override
         protected void doExcecute(Session session) {
 
             Iterator<QueueRecord> records = null;
@@ -942,7 +937,7 @@
 
         @Override
         public int getLimiterSize() {
-            return delivery.getFlowLimiterSize();
+            return delivery.getFlowLimiterSize() + BASE_MEM_SIZE + 40;
         }
 
         @Override
@@ -950,9 +945,9 @@
 
             if (singleElement == null) {
                 brokerDelivery.beginStore();
-                Set<Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>>> targets = brokerDelivery.getPersistentQueues();
+                Collection<SaveableQueueElement<MessageDelivery>> targets = brokerDelivery.getPersistentQueues();
 
-                if (!targets.isEmpty()) {
+                if (targets != null && !targets.isEmpty()) {
                     if (record == null) {
                         record = brokerDelivery.createMessageRecord();
                         if (record == null) {
@@ -962,24 +957,24 @@
                     record.setKey(brokerDelivery.getStoreTracking());
                     session.messageAdd(record);
 
-                    for (Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>> target : targets) {
+                    for (SaveableQueueElement<MessageDelivery> target : targets) {
                         try {
                             QueueRecord queueRecord = new QueueRecord();
                             queueRecord.setAttachment(null);
                             queueRecord.setMessageKey(record.getKey());
                             queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
-                            queueRecord.setQueueKey(target.getValue().getSequenceNumber());
-                            session.queueAddMessage(target.getKey(), queueRecord);
+                            queueRecord.setQueueKey(target.getSequenceNumber());
+                            session.queueAddMessage(target.getQueueDescriptor(), queueRecord);
 
                         } catch (KeyNotFoundException e) {
                             e.printStackTrace();
                         }
 
-                        if (target.getValue().requestSaveNotify()) {
+                        if (target.requestSaveNotify()) {
                             if (notifyTargets == null) {
                                 notifyTargets = new LinkedList<SaveableQueueElement<MessageDelivery>>();
                             }
-                            notifyTargets.add(target.getValue());
+                            notifyTargets.add(target);
                         }
                     }
                 } else {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java Thu May 14 13:41:14 2009
@@ -26,7 +26,7 @@
     public AbstractLimiter() {
     }
 
-    public final void addUnThrottleListener(UnThrottleListener l) {
+    public void addUnThrottleListener(UnThrottleListener l) {
         throttleListeners.add(l);
 
         if (!resuming && !getThrottled()) {
@@ -34,7 +34,7 @@
         }
     }
 
-    public final void notifyUnThrottleListeners() {
+    protected final void notifyUnThrottleListeners() {
         resuming = true;
         while (!getThrottled() && !throttleListeners.isEmpty()) {
             UnThrottleListener l = throttleListeners.remove();

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Thu May 14 13:41:14 2009
@@ -221,11 +221,6 @@
     public void add(E elem, ISourceController<?> sourceController) {
         boolean ok = false;
         synchronized (mutex) {
-            // If we don't have an fc sink, then just increment the limiter.
-            if (controllable == null) {
-                limiter.add(elem);
-                return;
-            }
             if (okToAdd(elem)) {
                 ok = true;
                 if (limiter.add(elem)) {
@@ -241,7 +236,7 @@
                 }
             }
         }
-        if (ok) {
+        if (ok && controllable != null) {
             controllable.flowElemAccepted(this, elem);
         }
     }
@@ -259,12 +254,6 @@
     public boolean offer(E elem, ISourceController<?> sourceController) {
         boolean ok = false;
         synchronized (mutex) {
-            // If we don't have an fc sink, then just increment the limiter.
-            if (controllable == null) {
-                limiter.add(elem);
-                return true;
-            }
-
             if (okToAdd(elem)) {
                 if (limiter.add(elem)) {
                     blockSource(sourceController);
@@ -274,7 +263,7 @@
                 blockSource(sourceController);
             }
         }
-        if (ok) {
+        if (ok && controllable != null) {
             controllable.flowElemAccepted(this, elem);
         }
         return ok;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Thu May 14 13:41:14 2009
@@ -40,7 +40,6 @@
     protected DispatchContext dispatchContext;
     protected final Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners = new ArrayList<IPollableFlowSource.FlowReadyListener<E>>();
     private boolean notifyReady = false;
-    protected boolean dispatching = false;
     protected int dispatchPriority = 0;
     protected FlowQueueListener listener = new FlowQueueListener()
     {
@@ -62,11 +61,11 @@
         this.listener = listener;
     }
 
-    public final void add(E elem, ISourceController<?> source) {
+    public void add(E elem, ISourceController<?> source) {
         getSinkController(elem, source).add(elem, source);
     }
 
-    public final boolean offer(E elem, ISourceController<?> source) {
+    public boolean offer(E elem, ISourceController<?> source) {
         return getSinkController(elem, source).offer(elem, source);
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Thu May 14 13:41:14 2009
@@ -27,6 +27,7 @@
 public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {
     private final LinkedList<E> queue = new LinkedList<E>();
     private final FlowController<E> controller;
+    private boolean started = true;
 
     /**
      * Creates a flow queue that can handle multiple flows.
@@ -42,7 +43,6 @@
         super.onFlowOpened(controller);
     }
 
-
     protected final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
         return controller;
     }
@@ -52,7 +52,17 @@
      */
     public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
         queue.add(elem);
-        notifyReady();
+        if (started) {
+            notifyReady();
+        }
+    }
+
+    public synchronized void start() {
+        started = true;
+    }
+
+    public synchronized void stop() {
+        started = false;
     }
 
     public FlowController<E> getFlowController(Flow flow) {
@@ -60,7 +70,7 @@
     }
 
     public final boolean isDispatchReady() {
-        return !queue.isEmpty();
+        return !queue.isEmpty() && started;
     }
 
     public final boolean pollingDispatch() {
@@ -76,6 +86,10 @@
 
     public final E poll() {
         synchronized (this) {
+            if (!started) {
+                return null;
+            }
+
             E elem = queue.poll();
             // FIXME the release should really be done after dispatch.
             // doing it here saves us from having to resynchronize

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java Thu May 14 13:41:14 2009
@@ -26,20 +26,22 @@
      * @return True if the element must be persisted on enqueue.
      */
     public boolean isPersistent(E elem);
-    
+
     /**
-     * Indicated whether or not paging is enabled for this queue. 
-     * When not enabled elements are kept in memory until they are removed. When
-     * enabled 
-     * @return Whether or not paging is enabled for the queue. 
+     * Indicated whether or not paging is enabled for this queue. When not
+     * enabled elements are kept in memory until they are removed. When enabled,
+     * elements are paged to persistent storage when
+     * {@link #getPagingInMemorySize()} is exceeded();
+     * 
+     * @return Whether or not paging is enabled for the queue.
      */
     public boolean isPagingEnabled();
-        
+
     /**
-     * When paging is enabled this specifies whether the queue can keep 
-     * a place holder for paged out elements in memory. Keeping place holders
-     * in memory improves performance, but for very large queues keeping
-     * place holders can cause a significant overhead. 
+     * When paging is enabled this specifies whether the queue can keep a place
+     * holder for paged out elements in memory. Keeping place holders in memory
+     * improves performance, but for very large queues keeping place holders can
+     * cause a significant overhead.
      * 
      * This method must return false if {@link #isPagingEnabled()} is false.
      * 
@@ -47,29 +49,107 @@
      */
     public boolean isPageOutPlaceHolders();
 
-    public static final class NON_PERSISTENT_POLICY<E> implements PersistencePolicy<E>{
+    /**
+     * Sets the memory threshold after which elements are paged out to disk to
+     * conserve memory.
+     * 
+     * @return The In Memory Paging Threshold.
+     */
+    public int getPagingInMemorySize();
 
-        /* (non-Javadoc)
-         * @see org.apache.activemq.queue.PersistencePolicy#isPersistOnEnqueue(java.lang.Object)
+    /**
+     * Indicates whether sources should be throttled to the memory limit
+     * while the queue is being actively consumed. 
+     *  
+     * @return true if sources should be throttled to the memory limit while
+     * the queue is being consumed. 
+     */
+    public boolean isThrottleSourcesToMemoryLimit();
+    
+    /**
+     * When the queue is being restored this indicates the preference to
+     * allow for recovering elements from the store versus newly added
+     * elements. For example if set to 3 then the queue will try to catch up
+     * by 3 elements for each newly added one. 
+     */
+    public int getRecoveryBias();
+    
+    /**
+     * The rate at which to throttle sources when the queue is stopped or
+     * or is not actively being consumed. 
+     * 
+     * @return the rate at which to throttle source when the queue is not
+     * being consumed. A value less than or equal to 0 means no limit. 
+     */
+    public int getDisconnectedThrottleRate();
+    
+
+    public static final class NON_PERSISTENT_POLICY<E> implements PersistencePolicy<E> {
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.PersistencePolicy#isPersistOnEnqueue(java
+         * .lang.Object)
          */
         public boolean isPersistent(E elem) {
             return false;
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.activemq.queue.PersistencePolicy#isKeepElementRefences()
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.PersistencePolicy#isKeepElementRefences()
          */
         public boolean isPageOutPlaceHolders() {
             return false;
         }
 
-        /* (non-Javadoc)
+        /*
+         * (non-Javadoc)
+         * 
          * @see org.apache.activemq.queue.PersistencePolicy#isPagingEnabled()
          */
         public boolean isPagingEnabled() {
             return false;
         }
-        
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.PersistencePolicy#getPagingInMemoryThreshold
+         * ()
+         */
+        public int getPagingInMemorySize() {
+            // TODO Auto-generated method stub
+            return Integer.MAX_VALUE;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.PersistencePolicy#getDisconnectedThrottleRate()
+         */
+        public int getDisconnectedThrottleRate() {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.PersistencePolicy#isThrottleSourcesToMemoryLimit()
+         */
+        public boolean isThrottleSourcesToMemoryLimit() {
+            return true;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.PersistencePolicy#getRecoveryBias()
+         */
+        public int getRecoveryBias() {
+            return Integer.MAX_VALUE;
+        }
+
     }
 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Thu May 14 13:41:14 2009
@@ -167,11 +167,13 @@
                 queue.getDescriptor().setParent(queueDescriptor.getQueueName());
                 queue.getDescriptor().setPartitionId(prio);
                 partitions.set(prio, queue);
-                onFlowOpened(queue.getFlowControler());
                 if (initialize) {
                     store.addQueue(queue.getDescriptor());
                     queue.initialize(0, 0, 0, 0);
+                    onFlowOpened(queue.getFlowControler());
                 }
+                
+                
                 if (started) {
                     queue.start();
                 }
@@ -191,14 +193,12 @@
 
     public void add(V value, ISourceController<?> source) {
         int prio = priorityMapper.map(value);
-        IQueue<K, V> partition = getPartition(prio, true);
-        partition.add(value, source);
+        getPartition(prio, true).add(value, source);
     }
 
     public boolean offer(V value, ISourceController<?> source) {
         int prio = priorityMapper.map(value);
-        IQueue<K, V> partition = getPartition(prio, true);
-        return partition.offer(value, source);
+        return getPartition(prio, true).offer(value, source);
     }
 
     public void setKeyMapper(Mapper<K, V> keyMapper) {