You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/29 17:33:45 UTC
svn commit: r789361 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
Author: chirino
Date: Mon Jun 29 15:33:41 2009
New Revision: 789361
URL: http://svn.apache.org/viewvc?rev=789361&view=rev
Log:
BrokerDatabase now exposed 2 properties:
* flushDelay
* storeBypass - Allows you to disable canceling database operations.
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=789361&r1=789360&r2=789361&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Mon Jun 29 15:33:41 2009
@@ -58,6 +58,7 @@
public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase> implements Service, DispatcherAware {
private static final boolean DEBUG = false;
+
private final Store store;
private final Flow databaseFlow = new Flow("database", false);
@@ -80,8 +81,10 @@
// num scheduled for delay
private long delayedFlushPointer = 0; // The last delayable sequence num
// requested.
- private final long FLUSH_DELAY_MS = 10;
- private final Runnable flushDelayCallback;
+ private long flushDelay = 10;
+
+ private final Runnable flushDelayCallback;
+ private boolean storeBypass = true;
public interface DatabaseListener {
/**
@@ -283,7 +286,7 @@
op.opSequenceNumber = opSequenceNumber++;
opQueue.addLast(op);
if (op.flushRequested || storeLimiter.getThrottled()) {
- if (op.isDelayable() && FLUSH_DELAY_MS > 0) {
+ if (op.isDelayable() && flushDelay > 0) {
scheduleDelayedFlush(op.opSequenceNumber);
} else {
updateFlushPointer(op.opSequenceNumber);
@@ -313,7 +316,7 @@
if (requestedDelayedFlushPointer == -1) {
requestedDelayedFlushPointer = delayedFlushPointer;
- dispatcher.schedule(flushDelayCallback, FLUSH_DELAY_MS, TimeUnit.MILLISECONDS);
+ dispatcher.schedule(flushDelayCallback, flushDelay, TimeUnit.MILLISECONDS);
}
}
@@ -705,15 +708,17 @@
public static final int BASE_MEM_SIZE = 20;
public boolean cancel() {
- if (executePending.compareAndSet(true, false)) {
- cancelled.set(true);
- // System.out.println("Cancelled: " + this);
- synchronized (opQueue) {
- unlink();
- storeController.elementDispatched(this);
- }
- return true;
- }
+ if( storeBypass ) {
+ if (executePending.compareAndSet(true, false)) {
+ cancelled.set(true);
+ // System.out.println("Cancelled: " + this);
+ synchronized (opQueue) {
+ unlink();
+ storeController.elementDispatched(this);
+ }
+ return true;
+ }
+ }
return cancelled.get();
}
@@ -1276,4 +1281,35 @@
return "AddTxOpOperation " + record.getKey() + super.toString();
}
}
+
+ public long getFlushDelay() {
+ return flushDelay;
+ }
+
+ public void setFlushDelay(long flushDelay) {
+ this.flushDelay = flushDelay;
+ }
+
+ /**
+ * @return true if operations are allowed to bypass the store.
+ */
+ public boolean isStoreBypass() {
+ return storeBypass;
+ }
+
+ /**
+ * Sets if persistent operations should be allowed to bypass the store.
+ * Defaults to true, as this will give you the best performance. In some
+ * cases, you want to disable this as the store being used will double
+ * as an audit log and you do not want any persistent operations
+ * to bypass the store.
+ *
+ * When store bypass is disabled, all {@link Operation#cancel()} requests
+ * will return false.
+ *
+ * @param enable if true will enable store bypass
+ */
+ public void setStoreBypass(boolean enable) {
+ this.storeBypass = enable;
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=789361&r1=789360&r2=789361&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Mon Jun 29 15:33:41 2009
@@ -24,11 +24,14 @@
import javax.jms.JMSException;
+import junit.framework.TestCase;
+
+import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.BrokerDatabase;
import org.apache.activemq.apollo.broker.BrokerQueueStore;
-import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
import org.apache.activemq.command.ActiveMQDestination;
@@ -58,20 +61,19 @@
import org.apache.activemq.queue.QueueDispatchTarget;
import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.queue.Subscription;
-import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
-
-import junit.framework.TestCase;
public class SharedQueuePerfTest extends TestCase {
- private static int PERFORMANCE_SAMPLES = 5;
+ private static int PERFORMANCE_SAMPLES = 500000;
IDispatcher dispatcher;
BrokerDatabase database;
BrokerQueueStore queueStore;
private static final boolean USE_KAHA_DB = true;
- private static final boolean PERSISTENT = false;
+ private static final boolean PERSISTENT = true;
private static final boolean PURGE_STORE = true;
+ // Producers send sync and operations are never canceled.
+ private static final boolean TEST_MAX_STORE_LATENCY = true;
private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
@@ -96,6 +98,10 @@
dispatcher.start();
database = new BrokerDatabase(createStore());
database.setDispatcher(dispatcher);
+ if( TEST_MAX_STORE_LATENCY ) {
+ database.setFlushDelay(0);
+ database.setStoreBypass(false);
+ }
database.start();
queueStore = new BrokerQueueStore();
queueStore.setDatabase(database);
@@ -286,7 +292,8 @@
class Producer implements Dispatchable, FlowUnblockListener<OpenWireMessageDelivery> {
private AtomicBoolean stopped = new AtomicBoolean(false);
private String name;
- protected final MetricCounter rate = new MetricCounter();
+ protected final MetricCounter sendRate = new MetricCounter();
+ AtomicBoolean waitingForAck = new AtomicBoolean();
private final DispatchContext dispatchContext;
protected IFlowController<OpenWireMessageDelivery> outboundController;
@@ -303,8 +310,8 @@
public Producer(String name, IQueue<Long, MessageDelivery> targetQueue) {
this.name = name;
- rate.name("Producer " + name + " Rate");
- totalProducerRate.add(rate);
+ sendRate.name("Producer " + name + " Rate");
+ totalProducerRate.add(sendRate);
dispatchContext = dispatcher.register(this, name);
// create a 1024 byte payload (2 bytes per char):
payload = new String(new byte[512]);
@@ -356,9 +363,32 @@
}
public boolean dispatch() {
+ // If flow controlled stop until flow control is lifted.
+ if (outboundController.isSinkBlocked()) {
+ if (outboundController.addUnblockListener(this)) {
+ return true;
+ }
+ }
+
+ if( TEST_MAX_STORE_LATENCY ) {
+ // We can't send again until we get persist ack.
+ if( waitingForAck.get() ) {
+ return true;
+ }
+ }
+
if (next == null) {
try {
- createNextMessage();
+ next = createNextMessage();
+ if (TEST_MAX_STORE_LATENCY) {
+ waitingForAck.set(true);
+ next.setPersistListener(new PersistListener() {
+ public void onMessagePersisted(OpenWireMessageDelivery delivery) {
+ waitingForAck.set(false);
+ dispatchContext.requestDispatch();
+ }
+ });
+ }
} catch (JMSException e) {
e.printStackTrace();
stopped.set(true);
@@ -366,20 +396,13 @@
}
}
- // If flow controlled stop until flow control is lifted.
- if (outboundController.isSinkBlocked()) {
- if (outboundController.addUnblockListener(this)) {
- return true;
- }
- }
-
+ sendRate.increment();
outboundQueue.add(next, null);
- rate.increment();
next = null;
return stopped.get();
}
- private void createNextMessage() throws JMSException {
+ private OpenWireMessageDelivery createNextMessage() throws JMSException {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setJMSPriority(priority);
message.setProducerId(producerId);
@@ -389,7 +412,7 @@
if (payload != null) {
message.setText(payload);
}
- next = new OpenWireMessageDelivery(message);
+ return new OpenWireMessageDelivery(message);
}
public void onFlowUnblocked(ISinkController<OpenWireMessageDelivery> controller) {