You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/04/27 20:40:49 UTC
svn commit: r769099 [5/5] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/protocol/
main/java/org/apache/activemq/broker/stomp/ main/jav...
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java Mon Apr 27 18:40:44 2009
@@ -26,7 +26,7 @@
@Override
protected Store createStore() {
KahaDBStore rc = new KahaDBStore();
- rc.setStoreDirectory(new File("target/test-data/kahadb-store-performance"));
+ rc.setStoreDirectory(new File("sub/test-data/kahadb-store-performance"));
rc.setDeleteAllMessages(true);
return rc;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java Mon Apr 27 18:40:44 2009
@@ -24,10 +24,10 @@
public class KahaDBStoreTest extends StoreTestBase {
@Override
- protected Store createStore() {
+ protected Store createStore(boolean delete) {
KahaDBStore rc = new KahaDBStore();
- rc.setStoreDirectory(new File("target/test-data/kahadb-store-test"));
- rc.setDeleteAllMessages(true);
+ rc.setStoreDirectory(new File("sub/test-data/kahadb-store-test"));
+ rc.setDeleteAllMessages(delete);
return rc;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java Mon Apr 27 18:40:44 2009
@@ -22,7 +22,7 @@
public class MemoryStoreTest extends StoreTestBase {
@Override
- protected Store createStore() {
+ protected Store createStore(boolean delete) {
return new MemoryStore();
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Mon Apr 27 18:40:44 2009
@@ -293,6 +293,10 @@
return outputQueue;
}
+ public final boolean hasSelector() {
+ return false;
+ }
+
public boolean match(Message message) {
return true;
}
@@ -365,7 +369,6 @@
}
public void addCommand(Object object) {
- boolean notify = false;
synchronized (outputQueue) {
outputCommandQueue.add(object);
notifyTransport();
@@ -521,7 +524,7 @@
private float targetRate = 200000;
private final int quantum = 10000;
- private final float lambda = .0001f;
+ //private final float lambda = .0001f;
boolean increase = true;
private static final boolean DEBUG = false;
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java Mon Apr 27 18:40:44 2009
@@ -22,7 +22,7 @@
import org.apache.activemq.flow.Commands.Message.MessageBean;
import org.apache.activemq.flow.Commands.Message.MessageBuffer;
import org.apache.activemq.protobuf.UTF8Buffer;
-import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.util.Mapper;
public class Message implements Serializable {
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Mon Apr 27 18:40:44 2009
@@ -35,6 +35,11 @@
public interface DeliveryTarget {
public IFlowSink<Message> getSink();
+ /**
+ * @return true if this sub has a selector
+ */
+ public boolean hasSelector();
+
public boolean match(Message message);
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Mon Apr 27 18:40:44 2009
@@ -26,11 +26,11 @@
import org.apache.activemq.flow.Commands.Destination.DestinationBean;
import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.util.Mapper;
public class MockBrokerTest extends TestCase {
- protected static final int PERFORMANCE_SAMPLES = 5;
+ protected static final int PERFORMANCE_SAMPLES = 3;
protected static final int SAMPLING_FREQUENCY = 5;
protected static final int FANIN_COUNT = 10;
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Mon Apr 27 18:40:44 2009
@@ -8,11 +8,12 @@
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Mapper;
import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.SharedPriorityQueue;
import org.apache.activemq.queue.SharedQueue;
import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.util.Mapper;
class MockQueue implements MockBroker.DeliveryTarget {
@@ -23,22 +24,21 @@
private Mapper<Integer, Message> partitionMapper;
private Mapper<Long, Message> keyExtractor;
+ private final MockStoreAdapater store = new MockStoreAdapater();
private IQueue<Long, Message> createQueue() {
if (partitionMapper != null) {
- PartitionedQueue<Integer, Long, Message> queue = new PartitionedQueue<Integer, Long, Message>() {
+ PartitionedQueue<Long, Message> queue = new PartitionedQueue<Long, Message>(destination.getName().toString()) {
@Override
- protected IQueue<Long, Message> cratePartition(Integer partitionKey) {
+ public IQueue<Long, Message> createPartition(int partitionKey) {
return createSharedFlowQueue();
}
-
- public boolean isElementPersistent(Message message) {
- return false;
- }
};
queue.setPartitionMapper(partitionMapper);
queue.setResourceName(destination.getName().toString());
+ queue.setStore(store);
+ queue.initialize(0, 0, 0, 0);
return queue;
} else {
return createSharedFlowQueue();
@@ -53,6 +53,8 @@
queue.setKeyMapper(keyExtractor);
queue.setAutoRelease(true);
queue.setDispatcher(broker.getDispatcher());
+ queue.setStore(store);
+ queue.initialize(0, 0, 0, 0);
return queue;
} else {
SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
@@ -60,6 +62,8 @@
queue.setKeyMapper(keyExtractor);
queue.setAutoRelease(true);
queue.setDispatcher(broker.getDispatcher());
+ queue.setStore(store);
+ queue.initialize(0, 0, 0, 0);
return queue;
}
}
@@ -77,6 +81,10 @@
public boolean isPreAcquired() {
return true;
}
+
+ public boolean isBrowser() {
+ return false;
+ }
public boolean matches(Message message) {
return dt.match(message);
@@ -94,6 +102,14 @@
public String toString() {
return getSink().toString();
}
+
+ public boolean hasSelector() {
+ return dt.hasSelector();
+ }
+
+ public boolean offer(Message elem, ISourceController<Message> controller, SubscriptionDeliveryCallback ackCallback) {
+ return getSink().offer(elem, controller);
+ }
};
subs.put(dt, sub);
queue.addSubscription(sub);
@@ -109,6 +125,7 @@
public void start() throws Exception {
queue = createQueue();
+ queue.start();
}
public void stop() throws Exception {
@@ -118,6 +135,10 @@
return queue;
}
+ public boolean hasSelector() {
+ return false;
+ }
+
public boolean match(Message message) {
return true;
}
@@ -150,4 +171,40 @@
this.destination = destination;
}
+ static final class MockStoreAdapater implements QueueStore<Long, Message> {
+
+ MockStoreAdapater() {
+
+ }
+
+ public final void deleteQueueElement(QueueStore.QueueDescriptor descriptor, Message elem) {
+
+ }
+
+ public final boolean isElemPersistent(Message elem) {
+ return false;
+ }
+
+ public final boolean isFromStore(Message elem) {
+ return false;
+ }
+
+ public final void persistQueueElement(QueueStore.QueueDescriptor descriptor, ISourceController<?> controller, Message elem, long sequence, boolean delayable) throws Exception {
+ // Noop;
+ }
+
+ public final void restoreQueueElements(QueueStore.QueueDescriptor queue, long firstSequence, long maxSequence, int maxCount, QueueStore.RestoreListener<Message> listener) {
+ throw new UnsupportedOperationException("Mock broker doesn't support persistence");
+ }
+
+ public final void addQueue(QueueStore.QueueDescriptor queue) {
+
+ }
+
+ public final void deleteQueue(QueueStore.QueueDescriptor queue) {
+
+ }
+
+ }
+
}
\ No newline at end of file