You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/19 20:07:40 UTC

svn commit: r756137 - /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java

Author: chirino
Date: Thu Mar 19 19:07:39 2009
New Revision: 756137

URL: http://svn.apache.org/viewvc?rev=756137&view=rev
Log:
Making the store interface more async

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=756137&r1=756136&r2=756137&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Thu Mar 19 19:07:39 2009
@@ -1,13 +1,19 @@
 package org.apache.activemq.broker.store;
 
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.CancellationException;
 
+import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
 /**
- * Interface to persistently store and access data needed by the messaging system. 
- *  
+ * Interface to persistently store and access data needed by the messaging
+ * system.
+ * 
  */
 public interface Store {
 
@@ -15,101 +21,185 @@
     }
     
     /**
-     * This interface allows you to query and update the Store. 
+     * This interface allows you to query and update the Store.
      * 
-     * This interface should only be called within the context of a transaction 
+     * This interface should only be called within the context of a transaction
      * controlled by the {@link Store#execute(Callback)} mehtod.
      * 
      */
     public interface Session {
-        
-        // Message related methods. 
+
+        // Message related methods.
         public RecordKey messageAdd(AsciiBuffer messageId, Buffer message);
+
         public RecordKey messageGetKey(AsciiBuffer messageId);
+
         public Buffer messageGet(RecordKey key);
 
-        // Message Chunking related methods. 
+        // Message Chunking related methods.
         public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message);
+
         public void messageChunkAdd(RecordKey key, Buffer message);
+
         public void messageChunkClose(RecordKey key);
+
         public Buffer messageChunkGet(RecordKey key, int offset, int max);
-        
-        /// Queue related methods.
+
+        // / Queue related methods.
         public Iterator<AsciiBuffer> queueList(AsciiBuffer first, int max);
+
         public void queueAdd(AsciiBuffer queue);
+
         public boolean queueRemove(AsciiBuffer queue);
-        
+
         public void queueAddMessage(AsciiBuffer queue, RecordKey key);
+
         public void queueRemoveMessage(AsciiBuffer queue, RecordKey key);
+
         public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
-        
-        // We could use this to associate additional data to a message on a queue like 
+
+        // We could use this to associate additional data to a message on a
+        // queue like
         // which consumer a message has been dispatched to.
         public void queueSetMessageAttachment(AsciiBuffer queue, RecordKey key, Buffer attachment);
+
         public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey key);
-                
-        /// Simple Key Value related methods could come in hand to store misc data.
+
+        // / Simple Key Value related methods could come in handy to store misc
+        // data.
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
+
         public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value);
+
         public Buffer mapGet(AsciiBuffer map, Buffer key);
+
         public Buffer mapRemove(AsciiBuffer map, Buffer key);
+
         public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max);
 
     }
-    
+
     /**
-     *  
-     * This interface is used to execute transacted code which returns a result.  
-     * 
-     * It is used by the {@link Store#execute(Callback)} method, 
-     * often as anonymous class.
+     * This interface is used to execute transacted code.
      * 
-     * @param <R> The type of result that the CallableCallback produces.
-     * @param <T> The type of exceptions that CallableCallback will throw.
+     * It is used by the {@link Store#execute(Callback)} method, often as
+     * anonymous class.
      */
-    public interface Callback<R, T extends Throwable> {
-        
+    public interface Operation {
+
         /**
-         * Gets called by the {@link Store#execute(Callback)} method within a transactional context.  
-         * If any exception is thrown including Runtime exception, the transaction is rolled back.
+         * Gets called by the {@link Store#add(Operation, ISourceController, boolean)} method within a
+         * transactional context. If any exception is thrown including Runtime
+         * exception, the transaction is rolled back.
          * 
-         * @param session provides you access to read and update the persistent data.
+         * @param session
+         *            provides you access to read and update the persistent
+         *            data.
          * @return the result of the CallableCallback
-         * @throws T if an error occurs and the transaction should get rolled back
+         * @throws CancellationException
+         *             if the operation has been canceled. If this is thrown, 
+         *             the {@link #onCommit()} and {@link #onRollback()} methods will
+         *             not be called. 
+         * @throws Exception
+         *             if an system error occured while executing the operations.
+         * @throws RuntimeException
+         *             if an system error occured while executing the operations.
+         */
+        public void execute(Session session) throws CancellationException, Exception, RuntimeException;
+        
+        /**
+         * Returns true if this operation can be delayed. This is useful in cases 
+         * where external events can negate the need to execute the operation. The delay
+         * interval is not guaranteed to be honored, if subsequent events or other 
+         * store flush policy/criteria requires a flush of subsequent events. 
+         * 
+         * @return True if the operation can be delayed. 
          */
-        public R execute(Session session) throws T;
+        public boolean isDelayable();
+        
+        /**
+         * Attempts to cancel the store operation. Returns true if the operation
+         * could be canceled or false if the operation was already executed by the 
+         * store. 
+         * 
+         * @return true if the operation could be canceled
+         */
+        public boolean cancel();
+        
+        /**
+         * Returns the size to be used when calculating how much space this operation
+         * takes on the store processing queue. 
+         * 
+         * @return The limiter size to be used. 
+         */
+        public long getLimiterSize();
+        
+        /**
+         * Called after {@link #execute(Session)} is called and the the operation has been committed. 
+         */
+        public void onCommit();
+        
+        /**
+         * Called after {@link #execute(Session)} is called and the the operation has been rolled back. 
+         */
+        public void onRollback(Throwable error);
     }
     
     /**
-     * Convenience class which allows you to implement {@link Callback} classes which do not return a value.
+     * This is a convenience base class that can be used to implement Operations.  
+     * It handles operation cancellation for you. 
      */
-    public abstract class VoidCallback <T extends Throwable> implements Callback<Object, T> {
+    public abstract class OperationBase implements Operation {
+        final private AtomicBoolean executePending = new AtomicBoolean(true);
         
-        /**
-         * Gets called by the {@link Store#execute(VoidCallback)} method within a transactional context.  
-         * If any exception is thrown including Runtime exception, the transaction is rolled back.
-         * 
-         * @param session provides you access to read and update the persistent data.
-         * @throws T if an error occurs and the transaction should get rolled back
-         */
-        abstract public void run(Session session) throws T;
+        public boolean cancel() {
+            return executePending.compareAndSet(true, false);
+        }
+
+        public void execute(Session session) throws CancellationException {
+            if( executePending.compareAndSet(true, false) ) {
+                doExcecute(session);
+            } else {
+                throw new CancellationException();
+            }
+        }
+
+        abstract protected void doExcecute(Session session);
         
-        final public Object execute(Session session) throws T {
-            run(session);
-            return null;
+        public long getLimiterSize() {
+            return 0;
+        }
+
+        public boolean isDelayable() {
+            return false;
+        }
+
+        public void onCommit() {
+        }
+
+        public void onRollback() {
         }
     }
 
     /**
-     * Executes user supplied {@link Callback}.  If the {@link Callback} does not throw
-     * any Exceptions, all updates to the store are committed, otherwise they
-     * are rolled back. Any exceptions thrown by the {@link Callback} are propagated by
-     * this method.
+     * Executes user supplied {@link Operation}. If the {@link Operation} does not
+     * throw any Exceptions, all updates to the store are committed, otherwise
+     * they are rolled back. Any exceptions thrown by the {@link Operation} are
+     * propagated by this method.
+     * 
+     * If limiter space on the store processing queue is exceeded, the controller will be
+     * blocked. 
      * 
-     * @param <T>
-     * @param closure
+     * If this method is called with flush set to <code>false</false> there is no 
+     * guarantee made about when the operation will be executed. If <code>flush</code> is 
+     * <code>true</code> and {@link Operation#isDelayable()} is also <code>true</code>
+     * then an attempt will be made to execute the event at the {@link Store}'s configured
+     * delay interval. 
+     * 
+     * @param op The operation to execute
+     * @param flush Whether or not this operation needs immediate processing. 
+     * @param controller the source of the operation. 
      */
-    public <R, T extends Throwable> R execute(Callback<R,T> closure);
-    
-    
+    public void add(Operation op, ISourceController<?> controller, boolean flush);
+
 }