You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/20 17:15:44 UTC

svn commit: r756565 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/activemq/...

Author: chirino
Date: Fri Mar 20 16:15:43 2009
New Revision: 756565

URL: http://svn.apache.org/viewvc?rev=756565&view=rev
Log:
More store work

Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java Fri Mar 20 16:15:43 2009
@@ -18,6 +18,7 @@
             this.flow = flow;
         }
 
+        @Override
         public void reserve(E elem) {
             super.reserve(elem);
 //            if (!clientMode) {
@@ -25,14 +26,16 @@
 //            }
         }
 
+        /*
         public void releaseReserved(E elem) {
             super.reserve(elem);
 //            if (!clientMode) {
 //                System.out.println(name + " Released Reserved " + this);
 //            }
-        }
-
-        protected void remove(int size) {
+        }*/
+        
+        @Override
+        public void remove(long size) {
             super.remove(size);
             if (!clientMode) {
                 available += size;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Fri Mar 20 16:15:43 2009
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.protobuf.AsciiBuffer;
 
 public interface DeliveryTarget {
     
@@ -25,4 +26,7 @@
     
     public boolean match(MessageDelivery message);
     
+    public boolean isDurable();
+    
+    public AsciiBuffer getPersistentQueueName();
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Fri Mar 20 16:15:43 2009
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 
 public interface MessageDelivery {
 
@@ -37,5 +38,18 @@
     public <T> T asType(Class<T> type);
 
     public boolean isPersistent();
-
+    
+    /**
+     * Assigns a tracking number to this MessageDelivery. Tracking numbers
+     * are assigned sequentially and are unique within the broker. 
+     */
+    public void setTrackingNumber(long tracking);
+    
+    public long getTrackingNumber();
+
+    /**
+     * Returns the message's buffer representation.
+     * @return
+     */
+    public Buffer getMessageBuffer();
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Fri Mar 20 16:15:43 2009
@@ -172,4 +172,13 @@
         this.destination = destination;
     }
 
+    public AsciiBuffer getPersistentQueueName() {
+        // TODO Auto-generated method stub
+        return destination.getName();
+    }
+
+    public boolean isDurable() {
+        return true;
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Fri Mar 20 16:15:43 2009
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 
 public class OpenWireMessageDelivery implements MessageDelivery {
 
@@ -27,6 +28,7 @@
     private Destination destination;
     private AsciiBuffer producerId;
     private Runnable completionCallback;
+    private long tracking;
 
     public OpenWireMessageDelivery(Message message) {
         this.message = message;
@@ -85,4 +87,21 @@
         return message.isPersistent();
     }
 
+    public void setTrackingNumber(long tracking) {
+        this.tracking = tracking;
+    }
+
+    public long getTrackingNumber() {
+        return tracking;
+    }
+    
+    /**
+     * Returns the message's buffer representation.
+     * 
+     * @return
+     */
+    public Buffer getMessageBuffer() {
+        throw new UnsupportedOperationException("Not yet implemented");
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Mar 20 16:15:43 2009
@@ -89,20 +89,20 @@
 
     protected final Object inboundMutex = new Object();
     protected IFlowController<MessageDelivery> inboundController;
-    
+
     protected BrokerConnection connection;
     private OpenWireFormat wireFormat;
-    private Router router; 
-    
+    private Router router;
+
     public void start() throws Exception {
         // Setup the inbound processing..
-        final Flow flow = new Flow("broker-"+connection.getName()+"-inbound", false);
+        final Flow flow = new Flow("broker-" + connection.getName() + "-inbound", false);
         SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
         inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
             public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
                 route(controller, elem);
             }
-        
+
             public String toString() {
                 return flow.getFlowName();
             }
@@ -113,7 +113,7 @@
     }
 
     public void onCommand(Object o) {
-        
+
         final Command command = (Command) o;
         boolean responseRequired = command.isResponseRequired();
         try {
@@ -198,13 +198,14 @@
                 // Control Methods
                 // /////////////////////////////////////////////////////////////////
                 public Response processWireFormat(WireFormatInfo info) throws Exception {
-                    
+
                     // Negotiate the openwire encoding options.
                     WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
                     wfn.sendWireFormat();
                     wfn.negociate(info);
-                    
-                    // Now that the encoding is negotiated.. let the client know the details about this
+
+                    // Now that the encoding is negotiated.. let the client know
+                    // the details about this
                     // broker.
                     BrokerInfo brokerInfo = new BrokerInfo();
                     brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
@@ -294,8 +295,8 @@
 
                 // /////////////////////////////////////////////////////////////////
                 // Methods for cluster operations
-                //    These commands are sent to the broker when it's acting like a 
-                //    client to another broker.
+                // These commands are sent to the broker when it's acting like a
+                // client to another broker.
                 // /////////////////////////////////////////////////////////////////
                 public Response processBrokerInfo(BrokerInfo info) throws Exception {
                     throw new UnsupportedOperationException();
@@ -308,11 +309,11 @@
                 public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
                     throw new UnsupportedOperationException();
                 }
-                
+
                 public Response processProducerAck(ProducerAck info) throws Exception {
                     return ack(command);
                 }
-                
+
             });
         } catch (Exception e) {
             if (responseRequired) {
@@ -325,11 +326,11 @@
 
         }
     }
-    
+
     public void onException(Exception error) {
-        if( !connection.isStopping() ) {
+        if (!connection.isStopping()) {
             error.printStackTrace();
-            new Thread(){
+            new Thread() {
                 @Override
                 public void run() {
                     try {
@@ -366,7 +367,7 @@
             return null;
         }
     }
-    
+
     class ProducerContext {
 
         private IFlowController<MessageDelivery> controller;
@@ -378,7 +379,7 @@
             // Openwire only uses credit windows at the producer level for
             // producers that request the feature.
             if (info.getWindowSize() > 0) {
-                final Flow flow = new Flow("broker-"+name+"-inbound", false);
+                final Flow flow = new Flow("broker-" + name + "-inbound", false);
                 WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
                     @Override
                     protected void sendCredit(int credit) {
@@ -407,7 +408,8 @@
         private final ConsumerInfo info;
         private String name;
         private BooleanExpression selector;
-
+        private boolean durable;
+        
         private SingleFlowRelay<MessageDelivery> queue;
         public WindowLimiter<MessageDelivery> limiter;
 
@@ -416,8 +418,8 @@
             this.name = info.getConsumerId().toString();
             selector = parseSelector(info);
 
-            Flow flow = new Flow("broker-"+name+"-outbound", false);
-            limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize()/2) {
+            Flow flow = new Flow("broker-" + name + "-outbound", false);
+            limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
                 public int getElementSize(MessageDelivery m) {
                     return 1;
                 }
@@ -436,7 +438,7 @@
         }
 
         public void ack(MessageAck info) {
-            synchronized(queue) {
+            synchronized (queue) {
                 limiter.onProtocolCredit(info.getMessageCount());
             }
         }
@@ -462,6 +464,14 @@
             }
         }
 
+        public boolean isDurable() {
+            return durable;
+        }
+
+        public AsciiBuffer getPersistentQueueName() {
+            return null;
+        }
+
     }
 
     protected void route(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
@@ -528,7 +538,7 @@
         }
         return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
     }
-    
+
     private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
         BooleanExpression rc = null;
         if (info.getSelector() != null) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Fri Mar 20 16:15:43 2009
@@ -19,6 +19,7 @@
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
 
@@ -30,6 +31,7 @@
     private String receiptId;
     private int priority = Integer.MIN_VALUE;
     private AsciiBuffer msgId;
+    private long tracking = -1;
 
     public StompMessageDelivery(StompFrame frame, Destination destiantion) {
         this.frame = frame;
@@ -99,4 +101,23 @@
         return "true".equals(p);
     }
 
+    public long getTrackingNumber() {
+        return tracking ;
+    }
+
+    public void setTrackingNumber(long tracking) {
+        this.tracking = tracking;
+    }
+    
+    /**
+     * Returns the message's buffer representation.
+     * @return
+     */
+    public Buffer getMessageBuffer()
+    {
+        //Todo use asType() instead?
+        throw new UnsupportedOperationException("not yet implemented");
+    }
+   
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Fri Mar 20 16:15:43 2009
@@ -263,6 +263,8 @@
         
         private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
 
+        private boolean durable;
+
         public ConsumerContext(final StompFrame subscribe) throws Exception {
             translator = translator(subscribe);
             
@@ -369,6 +371,14 @@
 //                return false;
 //            }
         }
+        
+        public boolean isDurable() {
+            return durable;
+        }
+
+        public AsciiBuffer getPersistentQueueName() {
+            return null;
+        }
 
     }
     

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=756565&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Fri Mar 20 16:15:43 2009
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Semaphore;
+
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.Store.Callback;
+import org.apache.activemq.broker.store.Store.RecordKey;
+import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.Session.DuplicateKeyException;
+import org.apache.activemq.broker.store.Store.Session.QueueNotFoundException;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IPollableFlowSource;
+import org.apache.activemq.queue.IPollableFlowSource.FlowReadyListener;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+public class BrokerDatabase {
+
+    private Store store;
+    private final Flow databaseFlow = new Flow("database", false);
+
+    private final SizeLimiter<Operation> storeLimiter;
+    private Thread flushThread;
+    private final ExclusiveQueue<Operation> opQueue;
+    private AtomicBoolean running = new AtomicBoolean(false);
+    private final Semaphore opsReady = new Semaphore(0);
+    private final FlowReadyListener<Operation> enqueueListener;
+    private DatabaseListener listener;
+
+    public interface DatabaseListener {
+        /**
+         * Called if there is a catastrophic problem with the database.
+         * 
+         * @param ioe
+         *            The causing exception.
+         */
+        public void onDatabaseException(IOException ioe);
+    }
+
+    public BrokerDatabase(Store store) {
+        storeLimiter = new SizeLimiter<Operation>(1024 * 512, 0) {
+            public int getElementSize(Operation op) {
+                return op.getLimiterSize();
+            }
+        };
+        opQueue = new ExclusiveQueue<Operation>(databaseFlow, "DataBaseQueue", storeLimiter);
+        enqueueListener = new FlowReadyListener<Operation>() {
+
+            public void onFlowReady(IPollableFlowSource<Operation> source) {
+                opsReady.release();
+            }
+        };
+    }
+
+    public synchronized void start() {
+        if (flushThread == null) {
+
+            running.set(true);
+            opsReady.drainPermits();
+            flushThread = new Thread(new Runnable() {
+
+                public void run() {
+                    processOps();
+                }
+
+            }, "StoreThread");
+            flushThread.start();
+        }
+    }
+
+    public synchronized void stop() {
+        if (flushThread != null) {
+
+            running.set(false);
+            boolean interrupted = false;
+            while (true) {
+                opsReady.release();
+                try {
+                    flushThread.join();
+                    break;
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
+            }
+
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+            flushThread = null;
+        }
+    }
+
+    public void persistReceivedMessage(MessageDelivery delivery, Collection<DeliveryTarget> targets, ISourceController<?> source) {
+        add(new AddMessageOperation(delivery, targets), source, true);
+    }
+
+    /**
+     * Executes user supplied {@link Operation}. If the {@link Operation} does
+     * not throw any Exceptions, all updates to the store are committed,
+     * otherwise they are rolled back. Any exceptions thrown by the
+     * {@link Operation} are propagated by this method.
+     * 
+     * If limiter space on the store processing queue is exceeded, the
+     * controller will be blocked.
+     * 
+     * If this method is called with flush set to
+     * <code>false</false> there is no 
+     * guarantee made about when the operation will be executed. If <code>flush</code>
+     * is <code>true</code> and {@link Operation#isDelayable()} is also
+     * <code>true</code> then an attempt will be made to execute the event at
+     * the {@link Store}'s configured delay interval.
+     * 
+     * @param op
+     *            The operation to execute
+     * @param flush
+     *            Whether or not this operation needs immediate processing.
+     * @param controller
+     *            the source of the operation.
+     */
+    private void add(Operation op, ISourceController<?> controller, boolean flush) {
+        opQueue.add(op, controller);
+    }
+
+    private final void processOps() {
+        while (running.get()) {
+            final Operation firstOp;
+            synchronized (opQueue) {
+                firstOp = opQueue.poll();
+                if (firstOp == null) {
+                    opQueue.addFlowReadyListener(enqueueListener);
+                    opsReady.acquireUninterruptibly();
+                    continue;
+                }
+            }
+            
+            // The first operation we get, triggers a store transaction.
+            if (firstOp != null) {
+                final ArrayList<Operation> processedQueue = new ArrayList<Operation>();
+                try {
+                    store.execute(new Store.VoidCallback<Exception>(){
+                        @Override
+                        public void run(Session session) throws Exception {
+                            
+                            // Try to execute the operation against the session...
+                            try {
+                                firstOp.execute(session);
+                                processedQueue.add(firstOp);
+                            } catch (CancellationException ignore) {
+                            }
+
+                            // See if we can batch up some additional operations in 
+                            // this transaction.  
+                            
+                            Operation op;
+                            synchronized (opQueue) {
+                                op = opQueue.poll();
+                                if (op != null) {
+                                    try {
+                                        firstOp.execute(session);
+                                        processedQueue.add(op);
+                                    } catch (CancellationException ignore) {
+                                    }
+                                }
+                            }
+                        }
+                    });
+                    // Wait for the operations to commit.
+                    for (Operation processed : processedQueue) {
+                        processed.onCommit();
+                    }
+                } catch (IOException e) {
+                    for (Operation processed : processedQueue) {
+                        processed.onRollback(e);
+                    }
+                    onDatabaseException(e);
+                } catch (RuntimeException e) {
+                    for (Operation processed : processedQueue) {
+                        processed.onRollback(e);
+                    }
+                } catch (Exception e) {
+                    for (Operation processed : processedQueue) {
+                        processed.onRollback(e);
+                    }
+                    
+                }
+            }
+        }
+    }
+
+    private void onDatabaseException(IOException ioe) {
+        if (listener != null) {
+            listener.onDatabaseException(ioe);
+        }
+    }
+
+    /**
+     * This interface is used to execute transacted code.
+     * 
+     * It is used by the {@link Store#execute(Callback)} method, often as
+     * anonymous class.
+     */
+    public interface Operation {
+
+        /**
+         * Gets called by the
+         * {@link Store#add(Operation, ISourceController, boolean)} method
+         * within a transactional context. If any exception is thrown including
+         * Runtime exception, the transaction is rolled back.
+         * 
+         * @param session
+         *            provides you access to read and update the persistent
+         *            data.
+         * @return the result of the CallableCallback
+         * @throws CancellationException
+         *             if the operation has been canceled. If this is thrown,
+         *             the {@link #onCommit()} and {@link #onRollback()} methods
+         *             will not be called.
+         * @throws Exception
+         *             if an system error occured while executing the
+         *             operations.
+         * @throws RuntimeException
+         *             if an system error occured while executing the
+         *             operations.
+         */
+        public void execute(Session session) throws CancellationException, Exception, RuntimeException;
+
+        /**
+         * Returns true if this operation can be delayed. This is useful in
+         * cases where external events can negate the need to execute the
+         * operation. The delay interval is not guaranteed to be honored, if
+         * subsequent events or other store flush policy/criteria requires a
+         * flush of subsequent events.
+         * 
+         * @return True if the operation can be delayed.
+         */
+        public boolean isDelayable();
+
+        /**
+         * Attempts to cancel the store operation. Returns true if the operation
+         * could be canceled or false if the operation was already executed by
+         * the store.
+         * 
+         * @return true if the operation could be canceled
+         */
+        public boolean cancel();
+
+        /**
+         * Returns the size to be used when calculating how much space this
+         * operation takes on the store processing queue.
+         * 
+         * @return The limiter size to be used.
+         */
+        public int getLimiterSize();
+
+        /**
+         * Called after {@link #execute(Session)} is called and the the
+         * operation has been committed.
+         */
+        public void onCommit();
+
+        /**
+         * Called after {@link #execute(Session)} is called and the the
+         * operation has been rolled back.
+         */
+        public void onRollback(Throwable error);
+    }
+
+    /**
+     * This is a convenience base class that can be used to implement
+     * Operations. It handles operation cancellation for you.
+     */
+    public abstract class OperationBase implements Operation {
+        final private AtomicBoolean executePending = new AtomicBoolean(true);
+
+        public boolean cancel() {
+            return executePending.compareAndSet(true, false);
+        }
+
+        public void execute(Session session) throws CancellationException {
+            if (executePending.compareAndSet(true, false)) {
+                doExcecute(session);
+            } else {
+                throw new CancellationException();
+            }
+        }
+
+        abstract protected void doExcecute(Session session);
+
+        public int getLimiterSize() {
+            return 0;
+        }
+
+        public boolean isDelayable() {
+            return false;
+        }
+
+        public void onCommit() {
+        }
+
+        public void onRollback() {
+        }
+    }
+
+    private class AddMessageOperation extends OperationBase {
+        private final MessageDelivery delivery;
+        private final Collection<DeliveryTarget> targets;
+        private final Buffer messageBuffer;
+        
+        public AddMessageOperation(MessageDelivery delivery, Collection<DeliveryTarget> targets) {
+            this.delivery = delivery;
+            this.messageBuffer = delivery.getMessageBuffer();
+            this.targets = targets;
+            
+        }
+
+        public int getLimiterSize() {
+            return delivery.getFlowLimiterSize();
+        }
+
+        @Override
+        protected void doExcecute(Session session) {
+            // TODO need to get at protocol buffer.
+            RecordKey key = session.messageAdd(delivery.getMsgId(), messageBuffer);
+            for(DeliveryTarget target : targets)
+            {
+                try {
+                    session.queueAddMessage(new AsciiBuffer(target.getPersistentQueueName()), key, null);
+                } catch (QueueNotFoundException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                } catch (DuplicateKeyException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        public void onRollback(Throwable error) {
+            // TODO Auto-generated method stub
+        }
+
+        public void onCommit() {
+            // Notify that we've saved the message.
+            delivery.getCompletionCallback().run();
+        }
+
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Fri Mar 20 16:15:43 2009
@@ -1,23 +1,65 @@
 package org.apache.activemq.broker.store;
 
-import java.util.Collection;
 import java.util.Iterator;
-import java.util.concurrent.CancellationException;
 
-import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Interface to persistently store and access data needed by the messaging
  * system.
- * 
  */
 public interface Store {
+    
+    /**
+     * This interface is used to execute transacted code.
+     * 
+     * It is used by the {@link Store#execute(Callback)} method, often as
+     * anonymous class.
+     */
+    public interface Callback<R, T extends Exception> {
 
-    public interface RecordKey {
+        /**
+         * Gets called by the {@link Store#execute(Callback)} method
+         * within a transactional context. If any exception is thrown including
+         * Runtime exception, the transaction is rolled back.
+         * 
+         * @param session
+         *            provides you access to read and update the persistent
+         *            data.
+         * @return the result of the Callback
+         * @throws T
+         *            if an system error occured while executing the
+         *            operations.
+         */
+        public R execute(Session session) throws T;
+    }
+    
+    /**
+     * Convenience class which allows you to implement {@link Callback} classes which do not return a value.
+     */
+    public abstract class VoidCallback <T extends Exception> implements Callback<Object, T> {
+        
+        /**
+         * Gets called by the {@link Store#execute(VoidCallback)} method within a transactional context.  
+         * If any exception is thrown including Runtime exception, the transaction is rolled back.
+         * 
+         * @param session provides you access to read and update the persistent data.
+         * @throws T if an error occurs and the transaction should get rolled back
+         */
+        abstract public void run(Session session) throws T;
+        
+        final public Object execute(Session session) throws T {
+            run(session);
+            return null;
+        }
+    }
+
+    public <R, T extends Exception> R execute(Callback<R,T> callback) throws T;
+
+    interface RecordKey {
+        
     }
     
     /**
@@ -29,177 +71,56 @@
      */
     public interface Session {
 
+        public class DuplicateKeyException extends Exception {
+            private static final long serialVersionUID = 1L;
+
+            public DuplicateKeyException(String message) {
+                super(message);
+            }
+        }
+
+        public class QueueNotFoundException extends Exception {
+            private static final long serialVersionUID = 1L;
+
+            public QueueNotFoundException(String message) {
+                super(message);
+            }
+        }
+
         // Message related methods.
         public RecordKey messageAdd(AsciiBuffer messageId, Buffer message);
-
         public RecordKey messageGetKey(AsciiBuffer messageId);
-
         public Buffer messageGet(RecordKey key);
 
         // Message Chunking related methods.
         public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message);
-
         public void messageChunkAdd(RecordKey key, Buffer message);
-
         public void messageChunkClose(RecordKey key);
-
         public Buffer messageChunkGet(RecordKey key, int offset, int max);
 
         // / Queue related methods.
-        public Iterator<AsciiBuffer> queueList(AsciiBuffer first, int max);
-
+        public Iterator<AsciiBuffer> queueList(AsciiBuffer first);
         public void queueAdd(AsciiBuffer queue);
-
         public boolean queueRemove(AsciiBuffer queue);
-
-        public void queueAddMessage(AsciiBuffer queue, RecordKey key);
-
-        public void queueRemoveMessage(AsciiBuffer queue, RecordKey key);
-
-        public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
+        public void queueAddMessage(AsciiBuffer queue, RecordKey key, Buffer attachment) throws QueueNotFoundException, DuplicateKeyException;
+        public void queueRemoveMessage(AsciiBuffer queue, RecordKey key) throws QueueNotFoundException;
+        public Iterator<Buffer> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max);
 
         // We could use this to associate additional data to a message on a
-        // queue like
-        // which consumer a message has been dispatched to.
-        public void queueSetMessageAttachment(AsciiBuffer queue, RecordKey key, Buffer attachment);
+        // queue like which consumer a message has been dispatched to.
+        // public void queueSetMessageAttachment(AsciiBuffer queue, RecordKey
+        // key, Buffer attachment) throws QueueNotFoundException;
 
-        public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey key);
+        // public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey
+        // key) throws QueueNotFoundException;
 
         // / Simple Key Value related methods could come in handy to store misc
         // data.
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
-
         public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value);
-
         public Buffer mapGet(AsciiBuffer map, Buffer key);
-
         public Buffer mapRemove(AsciiBuffer map, Buffer key);
-
         public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max);
 
     }
-
-    /**
-     * This interface is used to execute transacted code.
-     * 
-     * It is used by the {@link Store#execute(Callback)} method, often as
-     * anonymous class.
-     */
-    public interface Operation {
-
-        /**
-         * Gets called by the {@link Store#add(Operation, ISourceController, boolean)} method within a
-         * transactional context. If any exception is thrown including Runtime
-         * exception, the transaction is rolled back.
-         * 
-         * @param session
-         *            provides you access to read and update the persistent
-         *            data.
-         * @return the result of the CallableCallback
-         * @throws CancellationException
-         *             if the operation has been canceled. If this is thrown, 
-         *             the {@link #onCommit()} and {@link #onRollback()} methods will
-         *             not be called. 
-         * @throws Exception
-         *             if an system error occured while executing the operations.
-         * @throws RuntimeException
-         *             if an system error occured while executing the operations.
-         */
-        public void execute(Session session) throws CancellationException, Exception, RuntimeException;
-        
-        /**
-         * Returns true if this operation can be delayed. This is useful in cases 
-         * where external events can negate the need to execute the operation. The delay
-         * interval is not guaranteed to be honored, if subsequent events or other 
-         * store flush policy/criteria requires a flush of subsequent events. 
-         * 
-         * @return True if the operation can be delayed. 
-         */
-        public boolean isDelayable();
-        
-        /**
-         * Attempts to cancel the store operation. Returns true if the operation
-         * could be canceled or false if the operation was already executed by the 
-         * store. 
-         * 
-         * @return true if the operation could be canceled
-         */
-        public boolean cancel();
-        
-        /**
-         * Returns the size to be used when calculating how much space this operation
-         * takes on the store processing queue. 
-         * 
-         * @return The limiter size to be used. 
-         */
-        public long getLimiterSize();
-        
-        /**
-         * Called after {@link #execute(Session)} is called and the the operation has been committed. 
-         */
-        public void onCommit();
-        
-        /**
-         * Called after {@link #execute(Session)} is called and the the operation has been rolled back. 
-         */
-        public void onRollback(Throwable error);
-    }
-    
-    /**
-     * This is a convenience base class that can be used to implement Operations.  
-     * It handles operation cancellation for you. 
-     */
-    public abstract class OperationBase implements Operation {
-        final private AtomicBoolean executePending = new AtomicBoolean(true);
-        
-        public boolean cancel() {
-            return executePending.compareAndSet(true, false);
-        }
-
-        public void execute(Session session) throws CancellationException {
-            if( executePending.compareAndSet(true, false) ) {
-                doExcecute(session);
-            } else {
-                throw new CancellationException();
-            }
-        }
-
-        abstract protected void doExcecute(Session session);
-        
-        public long getLimiterSize() {
-            return 0;
-        }
-
-        public boolean isDelayable() {
-            return false;
-        }
-
-        public void onCommit() {
-        }
-
-        public void onRollback() {
-        }
-    }
-
-    /**
-     * Executes user supplied {@link Operation}. If the {@link Operation} does not
-     * throw any Exceptions, all updates to the store are committed, otherwise
-     * they are rolled back. Any exceptions thrown by the {@link Operation} are
-     * propagated by this method.
-     * 
-     * If limiter space on the store processing queue is exceeded, the controller will be
-     * blocked. 
-     * 
-     * If this method is called with flush set to <code>false</false> there is no 
-     * guarantee made about when the operation will be executed. If <code>flush</code> is 
-     * <code>true</code> and {@link Operation#isDelayable()} is also <code>true</code>
-     * then an attempt will be made to execute the event at the {@link Store}'s configured
-     * delay interval. 
-     * 
-     * @param op The operation to execute
-     * @param flush Whether or not this operation needs immediate processing. 
-     * @param controller the source of the operation. 
-     */
-    public void add(Operation op, ISourceController<?> controller, boolean flush);
-
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Fri Mar 20 16:15:43 2009
@@ -197,13 +197,13 @@
         }
     }
 
-    public void beginTransaction(ConnectionContext context) throws IOException {
+    public void beginTransaction() throws IOException {
         throw new IOException("Not yet implemented.");
     }
-    public void commitTransaction(ConnectionContext context) throws IOException {
+    public void commitTransaction() throws IOException {
         throw new IOException("Not yet implemented.");
     }
-    public void rollbackTransaction(ConnectionContext context) throws IOException {
+    public void rollbackTransaction() throws IOException {
         throw new IOException("Not yet implemented.");
     }
     

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=756565&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Fri Mar 20 16:15:43 2009
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeMap;
+
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+
+
+public class MemoryStore implements Store {
+
+    MemorySession session = new MemorySession();
+
+    private class MemorySession implements Session {
+        private HashMap<RecordKey, Buffer> messages = new HashMap<RecordKey, Buffer>();
+        private HashMap<AsciiBuffer, TreeMap<RecordKey, Buffer>> queues = new HashMap<AsciiBuffer, TreeMap<RecordKey, Buffer>>();
+
+        // private HashMap<String, LinkedList<RecordKey>> queues = new
+        // HashMap<String, LinkedList<RecordKey>>();
+
+        public void beginTx() {
+
+        }
+
+        public void commitTx() {
+
+        }
+
+        public void rollback() {
+            throw new UnsupportedOperationException();
+        }
+
+        // //////////////////////////////////////////////////////////////////////////////
+        // Message related methods.
+        // ///////////////////////////////////////////////////////////////////////////////
+        public RecordKey messageAdd(AsciiBuffer messageId, Buffer message) {
+            RecordKey key = new MemoryRecordKey(messageId);
+            messages.put(key, message);
+            return key;
+        }
+
+        public Buffer messageGet(RecordKey key) {
+            return messages.get(key);
+        }
+ 
+        public RecordKey messageGetKey(AsciiBuffer messageId) {
+            MemoryRecordKey key = new MemoryRecordKey(messageId);
+            return messages.containsKey(key) ? key : null;
+        }
+
+        // //////////////////////////////////////////////////////////////////////////////
+        // Queue related methods.
+        // ///////////////////////////////////////////////////////////////////////////////
+        public void queueAdd(AsciiBuffer queue) {
+            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
+            if (messages == null) {
+                messages = new TreeMap<RecordKey, Buffer>();
+                queues.put(queue, messages);
+            }
+        }
+
+        public void queueAddMessage(AsciiBuffer queue, RecordKey key, Buffer attachment) throws QueueNotFoundException, DuplicateKeyException {
+            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
+            if (messages != null) {
+                if (messages.put(key, attachment) != null) {
+                    throw new DuplicateKeyException("");
+                }
+            } else {
+                throw new QueueNotFoundException(queue.toString());
+            }
+        }
+
+        public void queueRemoveMessage(AsciiBuffer queue, RecordKey key) throws QueueNotFoundException {
+            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
+            if (messages != null) {
+                messages.remove(key);
+            } else {
+                throw new QueueNotFoundException(queue.toString());
+            }
+        }
+
+        public Iterator<AsciiBuffer> queueList(AsciiBuffer first) {
+            ArrayList<AsciiBuffer> list = new ArrayList<AsciiBuffer>(queues.size());
+            for (AsciiBuffer queue : queues.keySet()) {
+                list.add(queue);
+            }
+            return list.iterator();
+        }
+
+        public Iterator<Buffer> queueListMessagesQueue(AsciiBuffer queue, RecordKey firstRecord, int max) {
+            ArrayList<Buffer> list = new ArrayList<Buffer>(max);
+            TreeMap<RecordKey, Buffer> messages = queues.get(queue.toString());
+            if (messages != null) {
+                for (RecordKey key : messages.tailMap(firstRecord).keySet() ) {
+                    list.add(messages.get(key));
+                    if (list.size() == max) {
+                        break;
+                    }
+                }
+            }
+            return list.iterator();
+        }
+
+        public boolean queueRemove(AsciiBuffer queue) {
+            TreeMap<RecordKey, Buffer> messages = queues.get(queue.toString());
+            if (messages != null) {
+                Iterator<RecordKey> msgKeys = messages.keySet().iterator();
+                while (msgKeys.hasNext()) {
+                    RecordKey msgKey = msgKeys.next();
+                    try {
+                        queueRemoveMessage(queue, msgKey);
+                    } catch (QueueNotFoundException e) {
+                        // Can't happen.
+                    }
+                }
+                queues.remove(queue.toString());
+
+                return true;
+            }
+            return false;
+        }
+
+        /*
+         * public void queueUpdateMessageAttachment(AsciiBuffer queue, RecordKey
+         * key, Buffer attachment) { // TODO Auto-generated method stub }
+         * 
+         * public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey
+         * key) throws QueueNotFoundException { TreeMap<RecordKey, Buffer>
+         * messages = queues.get(queue); if (messages != null) {
+         * messages.add(key); } else { throw new
+         * QueueNotFoundException(queue.toString()); } }
+         */
+
+        // //////////////////////////////////////////////////////////////////////////////
+        // Simple Key Value related methods could come in handy to store misc
+        // data.
+        // ///////////////////////////////////////////////////////////////////////////////
+        public Buffer mapGet(AsciiBuffer map, Buffer key) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Iterator<Buffer> mapListKeys(AsciiBuffer map, Buffer first, int max) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Buffer mapRemove(AsciiBuffer map, Buffer key) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Buffer mapSet(AsciiBuffer map, Buffer key, Buffer value) {
+            throw new UnsupportedOperationException();
+        }
+
+        // ///////////////////////////////////////////////////////////////////////////////
+        // Message Chunking related methods
+        // ///////////////////////////////////////////////////////////////////////////////
+        public void messageChunkAdd(RecordKey key, Buffer message) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void messageChunkClose(RecordKey key) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Buffer messageChunkGet(RecordKey key, int offset, int max) {
+            throw new UnsupportedOperationException();
+        }
+
+        public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer txid, Buffer message) {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+
+    final private class MemoryRecordKey implements RecordKey {
+        final AsciiBuffer messageId;
+
+        MemoryRecordKey(AsciiBuffer messageId) {
+            this.messageId = messageId;
+        }
+        
+        @Override
+        public int hashCode() {
+            return messageId.hashCode();
+        }
+        
+        @Override
+        public boolean equals(Object obj) {
+            if( obj == null || obj.getClass()!=MemoryRecordKey.class )
+                return false;
+            if( this == obj )
+                return true;
+            MemoryRecordKey key = (MemoryRecordKey)obj;
+            return messageId.equals(key.messageId);
+        }
+    }
+
+    public <R, T extends Exception> R execute(Callback<R, T> callback) throws T {
+        return callback.execute(session);
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/journal-data.proto Fri Mar 20 16:15:43 2009
@@ -106,3 +106,21 @@
   required int32 log_id = 1;  
   required int32 offset = 2;  
 }
+
+message KahaQueueDef {
+	enum DestinationType {
+	    EXCLUSIVE = 0
+	    SHARED = 1
+  	}
+  	
+	required string name = 1;
+	required int64 id = 2
+	optional int64 size = 3;
+	optional int64 save_extent = 4;
+	optional int64 resume_threshold = 5;
+}
+
+message KahaUndelMessageRecord {
+	required int64 message_id = 1;
+	required int64 queue_id = 2;
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=756565&r1=756564&r2=756565&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java Fri Mar 20 16:15:43 2009
@@ -164,7 +164,7 @@
         outboundLimiter = createProtocolLimiter(true, outboundFlow, outputWindowSize, outputResumeThreshold);
 
         if (transport.narrow(DispatchableTransport.class) == null) {
-            blockingTransport = false;
+            blockingTransport = true;
             blockingWriter = Executors.newSingleThreadExecutor();
         }
 
@@ -408,7 +408,8 @@
             this.flow = flow;
         }
 
-        protected void remove(int size) {
+        @Override
+        public void remove(long size) {
             super.remove(size);
             if (!clientMode) {
                 available += size;