You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/04/27 20:40:49 UTC

svn commit: r769099 [5/5] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java Mon Apr 27 18:40:44 2009
@@ -26,7 +26,7 @@
     @Override
     protected Store createStore() {
         KahaDBStore rc = new KahaDBStore();
-        rc.setStoreDirectory(new File("target/test-data/kahadb-store-performance"));
+        rc.setStoreDirectory(new File("sub/test-data/kahadb-store-performance"));
         rc.setDeleteAllMessages(true);
         return rc;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java Mon Apr 27 18:40:44 2009
@@ -24,10 +24,10 @@
 public class KahaDBStoreTest extends StoreTestBase {
 
     @Override
-    protected Store createStore() {
+    protected Store createStore(boolean delete) {
         KahaDBStore rc = new KahaDBStore();
-        rc.setStoreDirectory(new File("target/test-data/kahadb-store-test"));
-        rc.setDeleteAllMessages(true);
+        rc.setStoreDirectory(new File("sub/test-data/kahadb-store-test"));
+        rc.setDeleteAllMessages(delete);
         return rc;
     }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java Mon Apr 27 18:40:44 2009
@@ -22,7 +22,7 @@
 public class MemoryStoreTest extends StoreTestBase {
 
     @Override
-    protected Store createStore() {
+    protected Store createStore(boolean delete) {
         return new MemoryStore();
     }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Mon Apr 27 18:40:44 2009
@@ -293,6 +293,10 @@
         return outputQueue;
     }
 
+    public final boolean hasSelector() {
+        return false;
+    }
+
     public boolean match(Message message) {
         return true;
     }
@@ -365,7 +369,6 @@
         }
 
         public void addCommand(Object object) {
-            boolean notify = false;
             synchronized (outputQueue) {
                 outputCommandQueue.add(object);
                 notifyTransport();
@@ -521,7 +524,7 @@
 
         private float targetRate = 200000;
         private final int quantum = 10000;
-        private final float lambda = .0001f;
+        //private final float lambda = .0001f;
         boolean increase = true;
 
         private static final boolean DEBUG = false;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java Mon Apr 27 18:40:44 2009
@@ -22,7 +22,7 @@
 import org.apache.activemq.flow.Commands.Message.MessageBean;
 import org.apache.activemq.flow.Commands.Message.MessageBuffer;
 import org.apache.activemq.protobuf.UTF8Buffer;
-import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.util.Mapper;
 
 public class Message implements Serializable {
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Mon Apr 27 18:40:44 2009
@@ -35,6 +35,11 @@
     public interface DeliveryTarget {
         public IFlowSink<Message> getSink();
 
+        /**
+         * @return true if this sub has a selector
+         */
+        public boolean hasSelector();
+        
         public boolean match(Message message);
     }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Mon Apr 27 18:40:44 2009
@@ -26,11 +26,11 @@
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.util.Mapper;
 
 public class MockBrokerTest extends TestCase {
 
-    protected static final int PERFORMANCE_SAMPLES = 5;
+    protected static final int PERFORMANCE_SAMPLES = 3;
     protected static final int SAMPLING_FREQUENCY = 5;
 
     protected static final int FANIN_COUNT = 10;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Mon Apr 27 18:40:44 2009
@@ -8,11 +8,12 @@
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Mapper;
 import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SharedPriorityQueue;
 import org.apache.activemq.queue.SharedQueue;
 import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.util.Mapper;
 
 class MockQueue implements MockBroker.DeliveryTarget {
 
@@ -23,22 +24,21 @@
 
     private Mapper<Integer, Message> partitionMapper;
     private Mapper<Long, Message> keyExtractor;
+    private final MockStoreAdapater store = new MockStoreAdapater();
 
     private IQueue<Long, Message> createQueue() {
 
         if (partitionMapper != null) {
-            PartitionedQueue<Integer, Long, Message> queue = new PartitionedQueue<Integer, Long, Message>() {
+            PartitionedQueue<Long, Message> queue = new PartitionedQueue<Long, Message>(destination.getName().toString()) {
                 @Override
-                protected IQueue<Long, Message> cratePartition(Integer partitionKey) {
+                public IQueue<Long, Message> createPartition(int partitionKey) {
                     return createSharedFlowQueue();
                 }
-
-                public boolean isElementPersistent(Message message) {
-                    return false;
-                }
             };
             queue.setPartitionMapper(partitionMapper);
             queue.setResourceName(destination.getName().toString());
+            queue.setStore(store);
+            queue.initialize(0, 0, 0, 0);
             return queue;
         } else {
             return createSharedFlowQueue();
@@ -53,6 +53,8 @@
             queue.setKeyMapper(keyExtractor);
             queue.setAutoRelease(true);
             queue.setDispatcher(broker.getDispatcher());
+            queue.setStore(store);
+            queue.initialize(0, 0, 0, 0);
             return queue;
         } else {
             SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
@@ -60,6 +62,8 @@
             queue.setKeyMapper(keyExtractor);
             queue.setAutoRelease(true);
             queue.setDispatcher(broker.getDispatcher());
+            queue.setStore(store);
+            queue.initialize(0, 0, 0, 0);
             return queue;
         }
     }
@@ -77,6 +81,10 @@
             public boolean isPreAcquired() {
                 return true;
             }
+            
+            public boolean isBrowser() {
+                return false;
+            }
 
             public boolean matches(Message message) {
                 return dt.match(message);
@@ -94,6 +102,14 @@
             public String toString() {
                 return getSink().toString();
             }
+
+            public boolean hasSelector() {
+                return dt.hasSelector();
+            }
+
+            public boolean offer(Message elem, ISourceController<Message> controller, SubscriptionDeliveryCallback ackCallback) {
+                return getSink().offer(elem, controller);
+            }
         };
         subs.put(dt, sub);
         queue.addSubscription(sub);
@@ -109,6 +125,7 @@
 
     public void start() throws Exception {
         queue = createQueue();
+        queue.start();
     }
 
     public void stop() throws Exception {
@@ -118,6 +135,10 @@
         return queue;
     }
 
+    public boolean hasSelector() {
+        return false;
+    }
+
     public boolean match(Message message) {
         return true;
     }
@@ -150,4 +171,40 @@
         this.destination = destination;
     }
 
+    static final class MockStoreAdapater implements QueueStore<Long, Message> {
+
+        MockStoreAdapater() {
+
+        }
+
+        public final void deleteQueueElement(QueueStore.QueueDescriptor descriptor, Message elem) {
+
+        }
+
+        public final boolean isElemPersistent(Message elem) {
+            return false;
+        }
+
+        public final boolean isFromStore(Message elem) {
+            return false;
+        }
+
+        public final void persistQueueElement(QueueStore.QueueDescriptor descriptor, ISourceController<?> controller, Message elem, long sequence, boolean delayable) throws Exception {
+            // Noop;
+        }
+
+        public final void restoreQueueElements(QueueStore.QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount, QueueStore.RestoreListener<Message> listener) {
+            throw new UnsupportedOperationException("Mock broker doesn't support persistence");
+        }
+
+        public final void addQueue(QueueStore.QueueDescriptor queue) {
+
+        }
+
+        public final void deleteQueue(QueueStore.QueueDescriptor queue) {
+
+        }
+
+    }
+
 }
\ No newline at end of file