You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/29 17:33:45 UTC

svn commit: r789361 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java

Author: chirino
Date: Mon Jun 29 15:33:41 2009
New Revision: 789361

URL: http://svn.apache.org/viewvc?rev=789361&view=rev
Log:
BrokerDatabase now exposed 2 properties:
 * flushDelay 
 * storeBypass - Allows you to disable canceling database operations.
 

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=789361&r1=789360&r2=789361&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Mon Jun 29 15:33:41 2009
@@ -58,6 +58,7 @@
 public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase> implements Service, DispatcherAware {
 
     private static final boolean DEBUG = false;
+    
     private final Store store;
     private final Flow databaseFlow = new Flow("database", false);
 
@@ -80,8 +81,10 @@
     // num scheduled for delay
     private long delayedFlushPointer = 0; // The last delayable sequence num
     // requested.
-    private final long FLUSH_DELAY_MS = 10;
-    private final Runnable flushDelayCallback;
+    private long flushDelay = 10;
+
+	private final Runnable flushDelayCallback;
+    private boolean storeBypass = true;
 
     public interface DatabaseListener {
         /**
@@ -283,7 +286,7 @@
             op.opSequenceNumber = opSequenceNumber++;
             opQueue.addLast(op);
             if (op.flushRequested || storeLimiter.getThrottled()) {
-                if (op.isDelayable() && FLUSH_DELAY_MS > 0) {
+                if (op.isDelayable() && flushDelay > 0) {
                     scheduleDelayedFlush(op.opSequenceNumber);
                 } else {
                     updateFlushPointer(op.opSequenceNumber);
@@ -313,7 +316,7 @@
 
         if (requestedDelayedFlushPointer == -1) {
             requestedDelayedFlushPointer = delayedFlushPointer;
-            dispatcher.schedule(flushDelayCallback, FLUSH_DELAY_MS, TimeUnit.MILLISECONDS);
+            dispatcher.schedule(flushDelayCallback, flushDelay, TimeUnit.MILLISECONDS);
         }
 
     }
@@ -705,15 +708,17 @@
         public static final int BASE_MEM_SIZE = 20;
 
         public boolean cancel() {
-            if (executePending.compareAndSet(true, false)) {
-                cancelled.set(true);
-                // System.out.println("Cancelled: " + this);
-                synchronized (opQueue) {
-                    unlink();
-                    storeController.elementDispatched(this);
-                }
-                return true;
-            }
+        	if( storeBypass ) {
+	            if (executePending.compareAndSet(true, false)) {
+	                cancelled.set(true);
+	                // System.out.println("Cancelled: " + this);
+	                synchronized (opQueue) {
+	                    unlink();
+	                    storeController.elementDispatched(this);
+	                }
+	                return true;
+	            }
+        	}
             return cancelled.get();
         }
 
@@ -1276,4 +1281,35 @@
             return "AddTxOpOperation " + record.getKey() + super.toString();
         }
     }
+    
+    public long getFlushDelay() {
+		return flushDelay;
+	}
+
+	public void setFlushDelay(long flushDelay) {
+		this.flushDelay = flushDelay;
+	}
+
+	/**
+	 * @return true if operations are allowed to bypass the store.
+	 */
+	public boolean isStoreBypass() {
+		return storeBypass;
+	}
+
+	/**
+	 * Sets if persistent operations should be allowed to bypass the store.
+	 * Defaults to true, as this will give you the best performance.  In some  
+	 * cases, you want to disable this as the store being used will double
+	 * as an audit log and you do not want any persistent operations
+	 * to bypass the store.
+	 * 
+	 * When store bypass is disabled, all {@link Operation#cancel()} requests
+	 * will return false.
+	 * 
+	 * @param enable if true will enable store bypass
+	 */
+	public void setStoreBypass(boolean enable) {
+		this.storeBypass = enable;
+	}
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=789361&r1=789360&r2=789361&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Mon Jun 29 15:33:41 2009
@@ -24,11 +24,14 @@
 
 import javax.jms.JMSException;
 
+import junit.framework.TestCase;
+
+import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerDatabase;
 import org.apache.activemq.apollo.broker.BrokerQueueStore;
-import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -58,20 +61,19 @@
 import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.queue.Subscription;
-import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
-
-import junit.framework.TestCase;
 
 public class SharedQueuePerfTest extends TestCase {
 
-    private static int PERFORMANCE_SAMPLES = 5;
+    private static int PERFORMANCE_SAMPLES = 500000;
 
     IDispatcher dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
-    private static final boolean PERSISTENT = false;
+    private static final boolean PERSISTENT = true;
     private static final boolean PURGE_STORE = true;
+    // Producers send sync and operations are never canceled. 
+    private static final boolean TEST_MAX_STORE_LATENCY = true;
     private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
 
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
@@ -96,6 +98,10 @@
         dispatcher.start();
         database = new BrokerDatabase(createStore());
         database.setDispatcher(dispatcher);
+        if( TEST_MAX_STORE_LATENCY ) {
+        	database.setFlushDelay(0);
+        	database.setStoreBypass(false);
+        }
         database.start();
         queueStore = new BrokerQueueStore();
         queueStore.setDatabase(database);
@@ -286,7 +292,8 @@
     class Producer implements Dispatchable, FlowUnblockListener<OpenWireMessageDelivery> {
         private AtomicBoolean stopped = new AtomicBoolean(false);
         private String name;
-        protected final MetricCounter rate = new MetricCounter();
+        protected final MetricCounter sendRate = new MetricCounter();
+		AtomicBoolean waitingForAck = new AtomicBoolean();
         private final DispatchContext dispatchContext;
 
         protected IFlowController<OpenWireMessageDelivery> outboundController;
@@ -303,8 +310,8 @@
 
         public Producer(String name, IQueue<Long, MessageDelivery> targetQueue) {
             this.name = name;
-            rate.name("Producer " + name + " Rate");
-            totalProducerRate.add(rate);
+            sendRate.name("Producer " + name + " Rate");
+            totalProducerRate.add(sendRate);
             dispatchContext = dispatcher.register(this, name);
             // create a 1024 byte payload (2 bytes per char):
             payload = new String(new byte[512]);
@@ -356,9 +363,32 @@
         }
 
         public boolean dispatch() {
+            // If flow controlled stop until flow control is lifted.
+            if (outboundController.isSinkBlocked()) {
+                if (outboundController.addUnblockListener(this)) {
+                    return true;
+                }
+            }
+
+            if( TEST_MAX_STORE_LATENCY ) {
+            	// We can't send again until we get persist ack.
+            	if( waitingForAck.get() ) {
+                    return true;
+            	}
+            }
+            
             if (next == null) {
                 try {
-                    createNextMessage();
+                	next = createNextMessage();
+					if (TEST_MAX_STORE_LATENCY) {
+						waitingForAck.set(true);
+						next.setPersistListener(new PersistListener() {
+							public void onMessagePersisted(OpenWireMessageDelivery delivery) {
+								waitingForAck.set(false);
+								dispatchContext.requestDispatch();
+							}
+						});
+					}
                 } catch (JMSException e) {
                     e.printStackTrace();
                     stopped.set(true);
@@ -366,20 +396,13 @@
                 }
             }
 
-            // If flow controlled stop until flow control is lifted.
-            if (outboundController.isSinkBlocked()) {
-                if (outboundController.addUnblockListener(this)) {
-                    return true;
-                }
-            }
-
+            sendRate.increment();
             outboundQueue.add(next, null);
-            rate.increment();
             next = null;
             return stopped.get();
         }
 
-        private void createNextMessage() throws JMSException {
+        private OpenWireMessageDelivery createNextMessage() throws JMSException {
             ActiveMQTextMessage message = new ActiveMQTextMessage();
             message.setJMSPriority(priority);
             message.setProducerId(producerId);
@@ -389,7 +412,7 @@
             if (payload != null) {
                 message.setText(payload);
             }
-            next = new OpenWireMessageDelivery(message);
+            return new OpenWireMessageDelivery(message);
         }
 
         public void onFlowUnblocked(ISinkController<OpenWireMessageDelivery> controller) {