You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/05/22 10:49:56 UTC

svn commit: r540493 [2/2] - in /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server: ./ messageStore/ queue/ txn/ util/

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java Tue May 22 01:49:53 2007
@@ -1,270 +1,300 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.messageStore;
-
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
-
-import javax.transaction.xa.Xid;
-import java.util.Collection;
-
-/**
- * Created by Arnaud Simon
- * Date: 29-Mar-2007
- * Time: 17:34:02
- */
-public interface MessageStore
-{
-    /**
-     * Create a new exchange
-     *
-     * @param exchange the exchange to be persisted
-     * @throws InternalErrorException If an error occurs
-     */
-    public void createExchange(Exchange exchange)
-            throws
-            InternalErrorException;
-
-     /**
-     * Remove an exchange
-     * @param exchange  The exchange to be removed
-     * @throws InternalErrorException If an error occurs
-     */
-     public void removeExchange(Exchange exchange) throws
-            InternalErrorException;
-
-    /**
-     * Bind a queue with an exchange given a routing key
-     *
-     * @param exchange   The exchange to bind the queue with
-     * @param routingKey The routing key
-     * @param queue      The queue to be bound
-     * @param args       Args
-     * @throws InternalErrorException If an error occurs
-     */
-    public void bindQueue(Exchange exchange,
-                          AMQShortString routingKey,
-                          StorableQueue queue, FieldTable args)
-            throws
-            InternalErrorException;
-
-      /**
-     * Unbind a queue from an exchange
-     *
-     * @param exchange   The exchange the queue was bound to
-     * @param routingKey The routing queue
-     * @param queue      The queue to unbind
-     * @param args       args
-     * @throws InternalErrorException If an error occurs
-     */
-    public void unbindQueue(Exchange exchange,
-                            AMQShortString routingKey,
-                            StorableQueue queue, FieldTable args)
-            throws
-            InternalErrorException;
-    
-    /**
-     * Called after instantiation in order to configure the message store. A particular implementation can define
-     * whatever parameters it wants.
-     *
-     * @param virtualHost The virtual host using by this store
-     * @param tm          The transaction manager implementation
-     * @param base        The base element identifier from which all configuration items are relative. For example, if the base
-     *                    element is "store", the all elements used by concrete classes will be "store.foo" etc.
-     * @param config      The apache commons configuration object
-     * @throws InternalErrorException   If an error occurs that means the store is unable to configure itself
-     * @throws IllegalArgumentException If the configuration arguments are illegal
-     */
-    void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
-            throws
-            InternalErrorException,
-            IllegalArgumentException;
-
-    /**
-     * Called to close and cleanup any resources used by the message store.
-     *
-     * @throws InternalErrorException if close fails
-     */
-    void close()
-            throws
-            InternalErrorException;
-
-    /**
-     * Create a queue
-     *
-     * @param queue the queue to be created
-     * @throws InternalErrorException      In case of internal message store problem
-     * @throws QueueAlreadyExistsException If the queue already exists in the store
-     */
-    public void createQueue(StorableQueue queue)
-            throws
-            InternalErrorException,
-            QueueAlreadyExistsException;
-
-    /**
-     * Destroy a queue
-     *
-     * @param queue The queue to be destroyed
-     * @throws InternalErrorException    In case of internal message store problem
-     * @throws QueueDoesntExistException If the queue does not exist in the store
-     */
-    public void destroyQueue(StorableQueue queue)
-            throws
-            InternalErrorException,
-            QueueDoesntExistException;
-
-    /**
-     * Stage the message before effective enqueue
-     *
-     * @param m The message to stage
-     * @throws InternalErrorException        In case of internal message store problem
-     * @throws MessageAlreadyStagedException If the message is already staged
-     */
-    public void stage(StorableMessage m)
-            throws
-            InternalErrorException,
-            MessageAlreadyStagedException;
-
-
-    /**
-     * Append more data with a previously staged message
-     *
-     * @param m      The message to which data must be appended
-     * @param data   Data to happen to the message
-     * @param offset The number of bytes from the beginning of the payload
-     * @param size   The number of bytes to be written
-     * @throws InternalErrorException      In case of internal message store problem
-     * @throws MessageDoesntExistException If the message has not been staged
-     */
-    public void appendContent(StorableMessage m, byte[] data, int offset, int size)
-            throws
-            InternalErrorException,
-            MessageDoesntExistException;
-
-    /**
-     * Get the content of previously staged or enqueued message.
-     * The message headers are also set.
-     *
-     * @param m      The message for which the content must be loaded
-     * @param offset The number of bytes from the beginning of the payload
-     * @param size   The number of bytes to be loaded
-     * @return The message content
-     * @throws InternalErrorException      In case of internal message store problem
-     * @throws MessageDoesntExistException If the message does not exist
-     */
-    public byte[] loadContent(StorableMessage m, int offset, int size)
-            throws
-            InternalErrorException,
-            MessageDoesntExistException;
-
-    /**
-     * Destroy a previously staged message
-     *
-     * @param m the message to be destroyed
-     * @throws InternalErrorException      In case of internal message store problem
-     * @throws MessageDoesntExistException If the message does not exist in the store
-     */
-    public void destroy(StorableMessage m)
-            throws
-            InternalErrorException,
-            MessageDoesntExistException;
-
-    /**
-     * Enqueue a message under the scope of the transaction branch
-     * identified by xid when specified.
-     * <p> This operation is propagated to the queue and the message.
-     * <p> A message that has been previously staged is assumed to have had
-     * its payload already added (see appendContent)
-     *
-     * @param xid   The xid of the transaction branch under which the message must be enqueued.
-     *              <p> It he xid is null then the message is enqueued outside the scope of any transaction.
-     * @param m     The message to be enqueued
-     * @param queue The queue into which the message must be enqueued
-     * @throws InternalErrorException      In case of internal message store problem
-     * @throws QueueDoesntExistException   If the queue does not exist in the store
-     * @throws InvalidXidException         The transaction branch is invalid
-     * @throws UnknownXidException         The transaction branch is unknown
-     * @throws MessageDoesntExistException If the Message does not exist
-     */
-    public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
-            throws
-            InternalErrorException,
-            QueueDoesntExistException,
-            InvalidXidException,
-            UnknownXidException,
-            MessageDoesntExistException;
-
-    /**
-     * Dequeue a message under the scope of the transaction branch identified by xid
-     * if specified.
-     * <p> This operation is propagated to the queue and the message.
-     *
-     * @param xid   The xid of the transaction branch under which the message must be dequeued.
-     *              <p> It he xid is null then the message is dequeued outside the scope of any transaction.
-     * @param m     The message to be dequeued
-     * @param queue The queue from which the message must be dequeued
-     * @throws InternalErrorException    In case of internal message store problem
-     * @throws QueueDoesntExistException If the queue does not exist in the store
-     * @throws InvalidXidException       The transaction branch is invalid
-     * @throws UnknownXidException       The transaction branch is unknown
-     */
-    public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
-            throws
-            InternalErrorException,
-            QueueDoesntExistException,
-            InvalidXidException,
-            UnknownXidException;
-
-    //=========================================================
-    // Recovery specific methods
-    //=========================================================
-
-    /**
-     * List all the persistent queues
-     *
-     * @return All the persistent queues
-     * @throws InternalErrorException In case of internal message store problem
-     */
-    public Collection<StorableQueue> getAllQueues()
-            throws
-            InternalErrorException;
-
-    /**
-     * All enqueued messages of a given queue
-     *
-     * @param queue The queue where the message are retrieved from
-     * @return The list all enqueued messages of a given queue
-     * @throws InternalErrorException In case of internal message store problem
-     */
-    public Collection<StorableMessage> getAllMessages(StorableQueue queue)
-            throws
-            InternalErrorException;
-
-    /**
-     * Get a new message ID
-     *
-     * @return A new message ID
-     */
-    public long getNewMessageId();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:34:02
+ */
+public interface MessageStore
+{
+    /**
+     * Create a new exchange
+     *
+     * @param exchange the exchange to be persisted
+     * @throws InternalErrorException If an error occurs
+     */
+    public void createExchange(Exchange exchange)
+            throws
+            InternalErrorException;
+
+    /**
+     * Remove an exchange
+     *
+     * @param exchange The exchange to be removed
+     * @throws InternalErrorException If an error occurs
+     */
+    public void removeExchange(Exchange exchange)
+            throws
+            InternalErrorException;
+
+    /**
+     * Bind a queue with an exchange given a routing key
+     *
+     * @param exchange   The exchange to bind the queue with
+     * @param routingKey The routing key
+     * @param queue      The queue to be bound
+     * @param args       Args
+     * @throws InternalErrorException If an error occurs
+     */
+    public void bindQueue(Exchange exchange,
+                          AMQShortString routingKey,
+                          StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException;
+
+    /**
+     * Unbind a queue from an exchange
+     *
+     * @param exchange   The exchange the queue was bound to
+     * @param routingKey The routing queue
+     * @param queue      The queue to unbind
+     * @param args       args
+     * @throws InternalErrorException If an error occurs
+     */
+    public void unbindQueue(Exchange exchange,
+                            AMQShortString routingKey,
+                            StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException;
+
+    /**
+     * Called after instantiation in order to configure the message store. A particular implementation can define
+     * whatever parameters it wants.
+     *
+     * @param virtualHost The virtual host using by this store
+     * @param tm          The transaction manager implementation
+     * @param base        The base element identifier from which all configuration items are relative. For example, if the base
+     *                    element is "store", the all elements used by concrete classes will be "store.foo" etc.
+     * @param config      The apache commons configuration object
+     * @throws InternalErrorException   If an error occurs that means the store is unable to configure itself
+     * @throws IllegalArgumentException If the configuration arguments are illegal
+     */
+    void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+            throws
+            InternalErrorException,
+            IllegalArgumentException;
+
+    /**
+     * Called to close and cleanup any resources used by the message store.
+     *
+     * @throws InternalErrorException if close fails
+     */
+    void close()
+            throws
+            InternalErrorException;
+
+    /**
+     * Create a queue
+     *
+     * @param queue the queue to be created
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws QueueAlreadyExistsException If the queue already exists in the store
+     */
+    public void createQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueAlreadyExistsException;
+
+    /**
+     * Destroy a queue
+     *
+     * @param queue The queue to be destroyed
+     * @throws InternalErrorException    In case of internal message store problem
+     * @throws QueueDoesntExistException If the queue does not exist in the store
+     */
+    public void destroyQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException;
+
+    /**
+     * Stage the message before effective enqueue
+     *
+     * @param m The message to stage
+     * @throws InternalErrorException        In case of internal message store problem
+     * @throws MessageAlreadyStagedException If the message is already staged
+     */
+    public void stage(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageAlreadyStagedException;
+
+
+    /**
+     * Append more data with a previously staged message
+     *
+     * @param m      The message to which data must be appended
+     * @param data   Data to happen to the message
+     * @param offset The number of bytes from the beginning of the payload
+     * @param size   The number of bytes to be written
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws MessageDoesntExistException If the message has not been staged
+     */
+    public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException;
+
+    /**
+     * Get the content of previously staged or enqueued message.
+     * The message headers are also set.
+     *
+     * @param m      The message for which the content must be loaded
+     * @param offset The number of bytes from the beginning of the payload
+     * @param size   The number of bytes to be loaded
+     * @return The message content
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws MessageDoesntExistException If the message does not exist
+     */
+    public byte[] loadContent(StorableMessage m, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException;
+
+    /**
+     * Get the content header of this message
+     *
+     * @param m      The message 
+     * @return The message content
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws MessageDoesntExistException If the message does not exist
+     */
+    public ContentHeaderBody getContentHeaderBody(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException;
+
+    /**
+     * Get the MessagePublishInfo of this message
+     *
+     * @param m      The message
+     * @return The message content
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws MessageDoesntExistException If the message does not exist
+     */
+      public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException;
+
+    /**
+     * Destroy a previously staged message
+     *
+     * @param m the message to be destroyed
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws MessageDoesntExistException If the message does not exist in the store
+     */
+    public void destroy(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException;
+
+    /**
+     * Enqueue a message under the scope of the transaction branch
+     * identified by xid when specified.
+     * <p> This operation is propagated to the queue and the message.
+     * <p> A message that has been previously staged is assumed to have had
+     * its payload already added (see appendContent)
+     *
+     * @param xid   The xid of the transaction branch under which the message must be enqueued.
+     *              <p> It he xid is null then the message is enqueued outside the scope of any transaction.
+     * @param m     The message to be enqueued
+     * @param queue The queue into which the message must be enqueued
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws QueueDoesntExistException   If the queue does not exist in the store
+     * @throws InvalidXidException         The transaction branch is invalid
+     * @throws UnknownXidException         The transaction branch is unknown
+     * @throws MessageDoesntExistException If the Message does not exist
+     */
+    public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException;
+
+    /**
+     * Dequeue a message under the scope of the transaction branch identified by xid
+     * if specified.
+     * <p> This operation is propagated to the queue and the message.
+     *
+     * @param xid   The xid of the transaction branch under which the message must be dequeued.
+     *              <p> It he xid is null then the message is dequeued outside the scope of any transaction.
+     * @param m     The message to be dequeued
+     * @param queue The queue from which the message must be dequeued
+     * @throws InternalErrorException    In case of internal message store problem
+     * @throws QueueDoesntExistException If the queue does not exist in the store
+     * @throws InvalidXidException       The transaction branch is invalid
+     * @throws UnknownXidException       The transaction branch is unknown
+     */
+    public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException;
+
+    //=========================================================
+    // Recovery specific methods
+    //=========================================================
+
+    /**
+     * List all the persistent queues
+     *
+     * @return All the persistent queues
+     * @throws InternalErrorException In case of internal message store problem
+     */
+    public Collection<StorableQueue> getAllQueues()
+            throws
+            InternalErrorException;
+
+    /**
+     * All enqueued messages of a given queue
+     *
+     * @param queue The queue where the message are retrieved from
+     * @return The list all enqueued messages of a given queue
+     * @throws InternalErrorException In case of internal message store problem
+     */
+    public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+            throws
+            InternalErrorException;
+
+    /**
+     * Get a new message ID
+     *
+     * @return A new message ID
+     */
+    public long getNewMessageId();
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java Tue May 22 01:49:53 2007
@@ -0,0 +1,135 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import java.util.ArrayList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 15-May-2007
+ * Time: 10:11:49
+ */
+public abstract class Pool
+{
+    // The maximum size of the pool.
+    private int _maxPoolSize = -1;
+    // The current size of the pool.
+    private int _currentPoolSize = 0;
+    // The pool objects.
+    private volatile ArrayList<Object> _poolObjects = new ArrayList<Object>();
+    //The current number of created instances.
+    private int _instanceCount = 0;
+
+    /**
+     * Create a pool of specified size. Negative or null pool sizes are
+     * disallowed.
+     *
+     * @param poolSize      The size of the pool to create. Should be 1 or
+     *                      greater.
+     * @throws Exception    If the pool size is less than 1.
+     */
+    public Pool(int poolSize) throws Exception
+    {
+        if (poolSize <= 0)
+        {
+            throw new Exception("pool size is less than 1: " + poolSize);
+        }
+        _maxPoolSize = poolSize;
+    }
+
+    /**
+     * Return the maximum size of this pool.
+     *
+     * @return  The maximum size of this pool.
+     */
+    public final int maxSize()
+    {
+        return _maxPoolSize;
+    }
+
+    /**
+     * Return the current number of created instances.
+     *
+     * @return The current number of created instances in this pool.
+     */
+    public final int numberOfInstances()
+    {
+        return _instanceCount;
+    }
+
+    /**
+     * Extending classes MUST define how to create an instance of the object
+     * that they pool.
+     *
+     * @return              An instance of the pooled object.
+     * @throws Exception    In case of internal error.
+     */
+    abstract protected Object createInstance() throws Exception;
+
+    /**
+     * Remove the next available object from the pool or wait for one to become
+     * available.
+     *
+     * @return           The next available instance.
+     * @throws Exception If the call is interrupted
+     */
+    public final synchronized Object acquireInstance() throws Exception
+    {
+        while (_currentPoolSize == _maxPoolSize)
+        {
+            try
+            {
+                this.wait();
+            }
+            catch (InterruptedException e)
+            {
+                  throw new Exception("pool wait threw interrupted exception", e);
+            }
+        }
+        if (_poolObjects.size() == 0)
+        {
+            _poolObjects.add(createInstance());
+            _instanceCount++;
+        }
+        _currentPoolSize++;
+        return _poolObjects.remove(0);
+    }
+
+    /**
+     * Return an object back into this pool.
+     *
+     * @param object    The returning object.
+     */
+    public synchronized void releaseInstance(Object object)
+    {
+        _poolObjects.add(object);
+        _currentPoolSize--;
+        this.notify();
+    }
+
+    /**
+     * Return a dead object back into this pool.
+     *
+     */
+    public synchronized void releaseDeadInstance()
+    {
+       _instanceCount--;
+       _currentPoolSize--;
+        this.notify();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java Tue May 22 01:49:53 2007
@@ -5,9 +5,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,7 +24,11 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.MessageDoesntExistException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.log4j.Logger;
 
 import javax.transaction.xa.Xid;
@@ -65,7 +69,7 @@
     // MessagePublishInfo
     private MessagePublishInfo _messagePublishInfo;
     // list of chunks
-    private List<ContentChunk> _chunks = new LinkedList<ContentChunk>();
+    private List<ContentChunk> _chunks;
 
     //========================================================================
     // Constructors
@@ -84,6 +88,17 @@
             throws
             AMQException
     {
+        if (_contentHeaderBody == null)
+        {
+            // load it from the store
+            try
+            {
+                _contentHeaderBody = _messageStore.getContentHeaderBody(_message);
+            } catch (Exception e)
+            {
+                throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+            }
+        }
         return _contentHeaderBody;
     }
 
@@ -91,6 +106,17 @@
             throws
             AMQException
     {
+       if (_chunks == null )
+       {
+           if(_message.isStaged() )
+           {
+              loadChunks();
+           }
+           else
+           {
+               return 0;
+           }
+      }
         return _chunks.size();
     }
 
@@ -106,13 +132,57 @@
             IllegalArgumentException,
             AMQException
     {
+        if (_chunks == null)
+        {
+            loadChunks();
+        }
         return _chunks.get(index);
     }
 
+    private void loadChunks()
+            throws
+            AMQException
+    {
+        try
+            {
+                _chunks = new LinkedList<ContentChunk>();
+            byte[] underlying = _messageStore.loadContent(_message, 1, 0);
+            final int size = underlying.length;
+            final org.apache.mina.common.ByteBuffer data =
+                    org.apache.mina.common.ByteBuffer.wrap(underlying);
+            ContentChunk cb = new ContentChunk()
+            {
+
+                public int getSize()
+                {
+                    return size;
+                }
+
+                public org.apache.mina.common.ByteBuffer getData()
+                {
+                    return data;
+                }
+
+                public void reduceToFit()
+                {
+
+                }
+            };
+            _chunks.add(cb);
+        } catch (Exception e)
+        {
+            throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+        }
+    }
+
     public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
             throws
             AMQException
     {
+        if (_chunks == null)
+        {
+            _chunks = new LinkedList<ContentChunk>();
+        }
         _chunks.add(contentBody);
         // if rquired this message can be added to the store
         //_messageStore.appendContent(_message, _payload, 0, 10);
@@ -123,6 +193,17 @@
             throws
             AMQException
     {
+        if (_messagePublishInfo == null)
+        {
+            // read it from the store
+            try
+            {
+                _messagePublishInfo = _messageStore.getMessagePublishInfo(_message);
+            } catch (Exception e)
+            {
+                throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+            }
+        }
         return _messagePublishInfo;
     }
 

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java Tue May 22 01:49:53 2007
@@ -0,0 +1,93 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.UnknownXidException;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 15:15:18
+ */
+public abstract class JDBCAbstractRecord implements TransactionRecord
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(JDBCEnqueueRecord.class);
+    // The record types
+    static public final int TYPE_DEQUEUE = 1;
+    static public final int TYPE_ENQUEUE = 2;
+
+    // the queue
+    StorableQueue _queue;
+    // the message
+    StorableMessage _message;
+
+    //========================================================================
+    // Constructor
+    //========================================================================
+    public JDBCAbstractRecord(StorableMessage m, StorableQueue queue)
+    {
+        _queue = queue;
+        _message = m;
+    }
+
+    public abstract int getType();
+    public long getMessageID()
+    {
+        return _message.getMessageId();
+    }
+
+    public int getQueueID()
+    {
+        return _queue.getQueueID();
+    }
+
+     public void rollback(MessageStore store)
+            throws
+            InternalErrorException
+     {
+
+    }
+
+    public void prepare(MessageStore store)
+            throws
+            InternalErrorException
+    {
+    }
+
+
+      public abstract void rollback(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            UnknownXidException;
+
+    public abstract void prepare(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            UnknownXidException;
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java Tue May 22 01:49:53 2007
@@ -0,0 +1,85 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:50:34
+ */
+public class JDBCDequeueRecord extends JDBCAbstractRecord
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(JDBCDequeueRecord.class);
+
+    //========================================================================
+    // Constructor
+    //========================================================================
+    public JDBCDequeueRecord( StorableMessage m, StorableQueue queue)
+    {
+      super(m, queue);
+    }
+
+    //========================================================================
+    // Interface TransactionRecord
+    //========================================================================
+
+    public void commit(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException
+    {
+        store.dequeue(xid, _message, _queue);
+    }
+
+    public void rollback(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            UnknownXidException
+    {
+        ((JDBCStore) store).rollbackDequeu(xid, _message, _queue);
+    }
+
+    public void prepare(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            UnknownXidException
+    {
+        ((JDBCStore) store).prepareDequeu(xid, _message, _queue);
+    }
+
+    public int getType()
+    {
+        return TYPE_DEQUEUE;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java Tue May 22 01:49:53 2007
@@ -0,0 +1,106 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:50:20
+ */
+public class JDBCEnqueueRecord extends  JDBCAbstractRecord
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(JDBCEnqueueRecord.class);
+
+    //========================================================================
+    // Constructor
+    //========================================================================
+    public JDBCEnqueueRecord(StorableMessage m, StorableQueue queue)
+    {
+        super(m, queue);
+    }
+
+    //========================================================================
+    // Interface TransactionRecord
+    //========================================================================
+
+    public void commit(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException
+    {
+        store.enqueue(xid, _message, _queue);
+    }
+
+    public void rollback(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            UnknownXidException
+    {
+        if (!_message.isEnqueued())
+        {
+            // try to delete the message
+            try
+            {
+                store.destroy(_message);
+            } catch (Exception e)
+            {
+                throw new InternalErrorException("Problem when destoying message ", e);
+            }
+        }
+    }
+
+    public void prepare(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            UnknownXidException
+    {
+        try
+        {
+            if (!_message.isEnqueued() && !_message.isStaged())
+            {
+                store.stage(_message);
+                store.appendContent(_message, _message.getData(), 0, _message.getData().length);
+            }
+        }
+        catch (Exception e)
+        {
+            throw new InternalErrorException("Problem when persisting message ", e);
+        }
+    }
+
+    public int getType()
+    {
+        return TYPE_ENQUEUE;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java Tue May 22 01:49:53 2007
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.JDBCStore;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:09:35
+ */
+public class JDBCTransaction implements Transaction
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(JDBCTransaction.class);
+    public static long _xidId = 0;
+    //========================================================================
+    // Instance Fields
+    //========================================================================
+    // the associated connection
+    private JDBCStore.MyConnection _connection;
+    // Indicates whether this transaction is prepared
+    private boolean _prepared = false;
+    // Indicates that this transaction has heuristically rolled back
+    private boolean _heurRollBack = false;
+    // The list of records associated with this tx
+    private List<TransactionRecord> _records = new LinkedList<TransactionRecord>();
+    // The date when this tx has been created.
+    private long _dateCreated;
+    // The timeout in seconds
+    private long _timeout;
+    // this instance xid id used as primary key
+    private long _thisxidId;
+
+    //=========================================================
+    // Constructors
+    //=========================================================
+
+    /**
+     * Create a transaction
+     *
+     */
+    public JDBCTransaction()
+    {
+        _dateCreated = System.currentTimeMillis();
+        _thisxidId = _xidId++;
+    }
+
+    //=========================================================
+    // Getter and Setter methods
+    //=========================================================
+
+    /**
+     * Notify that this tx has been prepared
+     */
+    public void prepare()
+    {
+        _prepared = true;
+    }
+
+    /**
+     * Specify whether this transaction is prepared
+     *
+     * @return true if this transaction is prepared, false otherwise
+     */
+    public boolean isPrepared()
+    {
+        return _prepared;
+    }
+
+     /**
+     * Notify that this tx has been heuristically rolled back
+     */
+    public void heurRollback()
+    {
+        _heurRollBack = true;
+    }
+
+    /**
+     * Specify whether this transaction has been heuristically rolled back
+     *
+     * @return true if this transaction has been heuristically rolled back , false otherwise
+     */
+    public boolean isHeurRollback()
+    {
+        return _heurRollBack;
+    }
+
+    /**
+     * Add an abstract record to this tx.
+     *
+     * @param record The record to be added
+     */
+    public void addRecord(TransactionRecord record)
+    {
+        _records.add(record);
+    }
+
+    /**
+     * Get the list of records associated with this tx.
+     *
+     * @return The list of records associated with this tx.
+     */
+    public List<TransactionRecord> getrecords()
+    {
+        return _records;
+    }
+
+    /**
+     * Set this tx timeout
+     *
+     * @param timeout This tx timeout in seconds
+     */
+    public void setTimeout(long timeout)
+    {
+        _timeout = timeout;
+    }
+
+    /**
+     * Get this tx timeout
+     *
+     * @return This tx timeout in seconds
+     */
+    public long getTimeout()
+    {
+        return _timeout;
+    }
+
+    /**
+     * Specify whether this tx has expired
+     *
+     * @return true if this tx has expired, false otherwise
+     */
+    public boolean hasExpired()
+    {
+        long currentDate = System.currentTimeMillis();
+        boolean result = currentDate - _dateCreated > _timeout * 1000;
+        if (_log.isDebugEnabled() && result)
+        {
+            _log.debug("transaction has expired");
+        }
+        return result;
+    }
+
+    /**
+     * Get the JDBC connection
+     * @return The JDBC connection
+     */
+    public JDBCStore.MyConnection getConnection()
+    {
+        return _connection;
+    }
+
+    /**
+     * Set the JDBC connection
+     *
+     * @param connection The new JDBC connection
+     */
+    public void setConnection(JDBCStore.MyConnection connection)
+    {
+        _connection = connection;
+    }
+
+    /**
+     * This tx xid id used as primary key
+     *
+     * @return this tx xid id 
+     */
+    public long getXidID()
+    {
+        return _thisxidId;
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java Tue May 22 01:49:53 2007
@@ -0,0 +1,554 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.HashMap;
+import java.util.Set;
+
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:05:45
+ */
+public class JDBCTransactionManager implements TransactionManager
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(JDBCTransactionManager.class);
+
+    private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout";
+
+    //========================================================================
+    // Instance Fields
+    //========================================================================
+    // The underlying jdbc message store
+    private JDBCStore _messagStore;
+
+    // A map of XID/x
+    private HashMap<Xid, Transaction> _xidMap;
+
+    // A map of in-doubt txs
+    private HashMap<Xid, Transaction> _indoubtXidMap;
+
+    // A default tx timeout in sec
+    private int _defaultTimeout;   // set to 10s if not specified in the config
+
+    //===================================
+    //=== Configuartion
+    //===================================
+
+    /**
+     * Configure this TM with the Message store implementation
+     *
+     * @param base         The base element identifier from which all configuration items are relative. For example, if the base
+     *                     element is "store", the all elements used by concrete classes will be "store.foo" etc.
+     * @param config       The apache commons configuration object
+     * @param messageStroe the message store associated with the TM
+     */
+    public void configure(MessageStore messageStroe, String base, Configuration config)
+    {
+        _messagStore = (JDBCStore) messageStroe;
+        if (config != null)
+        {
+            _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 120);
+        } else
+        {
+            _defaultTimeout = 120;
+        }
+        _log.info("Using transaction timeout of " + _defaultTimeout + " s");
+        // get the list of in-doubt transactions
+        try
+        {
+            _indoubtXidMap = _messagStore.getAllInddoubt();
+            _xidMap = _indoubtXidMap;
+        } catch (Exception e)
+        {
+            _log.fatal("Cannot recover in-doubt transactions", e);
+        }
+    }
+
+    //===================================
+    //=== TransactionManager interface
+    //===================================
+
+    /**
+     * Begin a transaction branch identified by Xid
+     *
+     * @param xid The xid of the branch to begin
+     * @return <ul>
+     *         <li>  <code>XAFlag.ok</code>: Normal execution.
+     *         </ul>
+     * @throws InternalErrorException In case of internal problem
+     * @throws InvalidXidException    The Xid is invalid
+     */
+    public synchronized XAFlag begin(Xid xid)
+            throws
+            InternalErrorException,
+            InvalidXidException
+    {
+        if (xid == null)
+        {
+            throw new InvalidXidException(xid, "null xid");
+        }
+        if (_xidMap.containsKey(xid))
+        {
+            throw new InvalidXidException(xid, "Xid already exist");
+        }
+        Transaction tx = new JDBCTransaction();
+        tx.setTimeout(_defaultTimeout);
+        _xidMap.put(xid, tx);
+        return XAFlag.ok;
+    }
+
+    /**
+     * Prepare the transaction branch identified by Xid
+     *
+     * @param xid The xid of the branch to prepare
+     * @return <ul>
+     *         <li> <code>XAFlag.ok</code>: Normal execution.
+     *         <li> <code>XAFlag.rdonly</code>: The transaction branch was read-only and has been committed.
+     *         <li> <code>XAFlag.rbrollback</code>: The transaction branch was marked rollback-only for an unspecied reason.
+     *         <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+     *         </ul>
+     * @throws InternalErrorException  In case of internal problem
+     * @throws CommandInvalidException Prepare has been call in an improper context
+     * @throws UnknownXidException     The Xid is unknown
+     */
+    public synchronized XAFlag prepare(Xid xid)
+            throws
+            InternalErrorException,
+            CommandInvalidException,
+            UnknownXidException
+    {
+        // get the transaction
+        JDBCTransaction tx = getTransaction(xid);
+        XAFlag result = XAFlag.ok;
+        if (tx.isHeurRollback())
+        {
+            result = XAFlag.rdonly;
+        } else if (tx.hasExpired())
+        {
+            result = XAFlag.rbtimeout;
+            // rollback this tx branch
+            rollback(xid);
+        } else
+        {
+            if (tx.isPrepared())
+            {
+                throw new CommandInvalidException("TransactionImpl is already prepared");
+            }
+            if (tx.getrecords().size() == 0)
+            {
+                // the tx was read only (no work has been done)
+                _xidMap.remove(xid);
+                result = XAFlag.rdonly;
+            } else
+            {
+                try
+                {
+                    JDBCStore.MyConnection con = _messagStore.getConnection();
+                    tx.setConnection(con);
+                    // save the xid
+                    _messagStore.saveXID(con, tx, xid);
+                    for (TransactionRecord record : tx.getrecords())
+                    {
+                        if (record instanceof JDBCAbstractRecord)
+                        {
+                            ((JDBCAbstractRecord) record).prepare(_messagStore, xid);
+                            _messagStore.saveRecord(con, tx, (JDBCAbstractRecord) record);
+                        } else
+                        {
+                            record.prepare(_messagStore);
+                        }
+                    }
+                    _messagStore.commitConnection(con);
+                    tx.setConnection(null);
+                } catch (Exception e)
+                {
+                    _log.error("Cannot prepare tx: " + xid);
+                    throw new InternalErrorException("Cannot prepare tx: " + xid);
+                }
+                tx.prepare();
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Rollback the transaction branch identified by Xid
+     *
+     * @param xid The xid of the branch to rollback
+     * @return <ul>
+     *         <li> <code>XAFlag.ok</code>: Normal execution,
+     *         <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+     *         <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+     *         <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+     *         <li> <code>NOT SUPPORTED XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+     *         <li> <code>NOT SUPPORTED XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason.
+     *         <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+     *         </ul>
+     * @throws InternalErrorException  In case of internal problem
+     * @throws CommandInvalidException Rollback has been call in an improper context
+     * @throws UnknownXidException     The Xid is unknown
+     */
+    public synchronized XAFlag rollback(Xid xid)
+            throws
+            InternalErrorException,
+            CommandInvalidException,
+            UnknownXidException
+    {
+        // get the transaction
+        JDBCTransaction tx = getTransaction(xid);
+        XAFlag flag = XAFlag.ok;
+        if (tx.isHeurRollback())
+        {
+            flag = XAFlag.heurrb;
+        } else
+        {
+            try
+            {
+                JDBCStore.MyConnection con = _messagStore.getConnection();
+                tx.setConnection(con);
+                for (TransactionRecord record : tx.getrecords())
+                {
+                    if (record instanceof JDBCAbstractRecord)
+                    {
+                        ((JDBCAbstractRecord) record).rollback(_messagStore, xid);
+                    } else
+                    {
+                        record.rollback(_messagStore);
+                    }
+                }
+                if (tx.isPrepared())
+                {
+                    _messagStore.deleteRecords(con, tx);
+                    _messagStore.deleteXID(con, tx);
+                    _messagStore.commitConnection(con);
+                }
+                _messagStore.commitConnection(con);
+                tx.setConnection(null);
+            }
+            catch (Exception e)
+            {
+                // this should not happen
+                _log.error("Error when rolling back distributed transaction: " + xid);
+                throw new InternalErrorException("Error when rolling back distributed transaction: " + xid, e);
+            }
+            removeTransaction(xid);
+        }
+        if (tx.hasExpired())
+        {
+            flag = XAFlag.rbtimeout;
+        }
+        return flag;
+    }
+
+    /**
+     * Commit the transaction branch identified by Xid
+     *
+     * @param xid The xid of the branch to commit
+     * @return <ul>
+     *         <li> <code>XAFlag.ok</code>: Normal execution,
+     *         <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+     *         <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the specied transaction branch was committed.
+     *         <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+     *         <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+     *         </ul>
+     * @throws InternalErrorException  In case of internal problem
+     * @throws CommandInvalidException Commit has been call in an improper context
+     * @throws UnknownXidException     The Xid is unknown
+     * @throws org.apache.qpid.server.exception.NotPreparedException
+     *                                 The branch was not prepared prior to commit
+     */
+    public synchronized XAFlag commit(Xid xid)
+            throws
+            InternalErrorException,
+            CommandInvalidException,
+            UnknownXidException,
+            NotPreparedException
+    {
+        // get the transaction
+        JDBCTransaction tx = getTransaction(xid);
+        XAFlag flag = XAFlag.ok;
+        if (tx.isHeurRollback())
+        {
+            flag = XAFlag.heurrb;
+        } else if (tx.hasExpired())
+        {
+            flag = XAFlag.rbtimeout;
+            // rollback this tx branch
+            rollback(xid);
+        } else
+        {
+            if (!tx.isPrepared())
+            {
+                throw new NotPreparedException("TransactionImpl is not prepared");
+            }
+            try
+            {
+                JDBCStore.MyConnection con = _messagStore.getConnection();
+                tx.setConnection(con);
+                for (TransactionRecord record : tx.getrecords())
+                {
+                    try
+                    {
+                        record.commit(_messagStore, xid);
+                    } catch (InvalidXidException e)
+                    {
+                        throw new UnknownXidException(xid, e);
+                    } catch (Exception e)
+                    {
+                        // this should not happen as the queue and the message must exist
+                        _log.error("Error when committing distributed transaction heurmix mode returned: " + xid);
+                        flag = XAFlag.heurmix;
+                    }
+                }
+                _messagStore.deleteRecords(con, tx);
+                _messagStore.deleteXID(con, tx);
+                _messagStore.commitConnection(con);
+                tx.setConnection(null);
+            } catch (Exception e)
+            {
+                // this should not happen
+                _log.error("Error when committing distributed transaction heurrb mode returned: " + xid);
+                throw new InternalErrorException("Error when committing distributed transaction: " + xid, e);
+            }
+            removeTransaction(xid);
+        }
+        return flag;
+    }
+
+    /**
+     * One phase commit the transaction branch identified by Xid
+     *
+     * @param xid The xid of the branch to one phase commit
+     * @return <ul>
+     *         <li> <code>XAFlag.ok</code>: Normal execution,
+     *         <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+     *         <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+     *         <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+     *         <li> <code>NOT SUPPORTED XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+     *         <li> <code>NOT SUPPORTED XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason.
+     *         <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+     *         </ul>
+     * @throws InternalErrorException  In case of internal problem
+     * @throws CommandInvalidException Commit has been call in an improper context
+     * @throws UnknownXidException     The Xid is unknown
+     */
+    public synchronized XAFlag commit_one_phase(Xid xid)
+            throws
+            InternalErrorException,
+            CommandInvalidException,
+            UnknownXidException
+    {
+        XAFlag flag = XAFlag.ok;
+        JDBCTransaction tx = getTransaction(xid);
+        if (tx.isHeurRollback())
+        {
+            flag = XAFlag.heurrb;
+        } else if (tx.hasExpired())
+        {
+            flag = XAFlag.rbtimeout;
+            // rollback this tx branch
+            rollback(xid);
+        } else
+        {
+            try
+            {
+                // we do not need to prepare the tx
+                tx.prepare();
+                JDBCStore.MyConnection con = _messagStore.getConnection();
+                tx.setConnection(con);
+                for (TransactionRecord record : tx.getrecords())
+                {
+                    try
+                    {
+                        record.commit(_messagStore, xid);
+                    } catch (InvalidXidException e)
+                    {
+                        throw new UnknownXidException(xid, e);
+                    } catch (Exception e)
+                    {
+                        // this should not happen as the queue and the message must exist
+                        _log.error("Error when committing transaction heurmix mode returned: " + xid);
+                        flag = XAFlag.heurmix;
+                    }
+                }
+                _messagStore.commitConnection(con);
+                tx.setConnection(null);
+            } catch (Exception e)
+            {
+                e.printStackTrace();
+                throw new InternalErrorException("cannot commit transaxtion with xid " + xid + " " + e, e);
+            }
+            finally
+            {
+                removeTransaction(xid);
+            }
+        }
+        return flag;
+    }
+
+    /**
+     * Forget about the transaction branch identified by Xid
+     *
+     * @param xid The xid of the branch to forget
+     * @throws InternalErrorException  In case of internal problem
+     * @throws CommandInvalidException Forget has been call in an improper context
+     * @throws UnknownXidException     The Xid is unknown
+     */
+    public void forget(Xid xid)
+            throws
+            InternalErrorException,
+            CommandInvalidException,
+            UnknownXidException
+    {
+        synchronized (xid)
+        {
+            getTransaction(xid);
+            removeTransaction(xid);
+        }
+    }
+
+    /**
+     * Set the transaction branch timeout value in seconds
+     *
+     * @param xid     The xid of the branch to set timeout
+     * @param timeout Timeout value in seconds
+     * @throws InternalErrorException In case of internal problem
+     * @throws UnknownXidException    The Xid is unknown
+     */
+    public void setTimeout(Xid xid, long timeout)
+            throws
+            InternalErrorException,
+            UnknownXidException
+    {
+        JDBCTransaction tx = getTransaction(xid);
+        tx.setTimeout(timeout);
+    }
+
+    /**
+     * Get the transaction branch timeout
+     *
+     * @param xid The xid of the branch to get the timeout from
+     * @return The timeout associated with the branch identified with xid
+     * @throws InternalErrorException In case of internal problem
+     * @throws UnknownXidException    The Xid is unknown
+     */
+    public long getTimeout(Xid xid)
+            throws
+            InternalErrorException,
+            UnknownXidException
+    {
+        JDBCTransaction tx = getTransaction(xid);
+        return tx.getTimeout();
+    }
+
+    /**
+     * Get a set of Xids the RM has prepared or heuristically completed
+     *
+     * @param startscan Indicates that recovery scan should start
+     * @param endscan   Indicates that the recovery scan should end after returning the Xids
+     * @return Set of Xids the RM has prepared or  heuristically completed
+     * @throws InternalErrorException  In case of internal problem
+     * @throws CommandInvalidException Recover has been call in an improper context
+     */
+    public Set<Xid> recover(boolean startscan, boolean endscan)
+            throws
+            InternalErrorException,
+            CommandInvalidException
+    {
+        return _indoubtXidMap.keySet();
+    }
+
+    /**
+     * An error happened (for example the channel has been abruptly closed)
+     * with this Xid, TM must make a heuristical decision.
+     *
+     * @param xid The Xid of the transaction branch to be heuristically completed
+     * @throws UnknownXidException    The Xid is unknown
+     * @throws InternalErrorException In case of internal problem
+     */
+    public void HeuristicOutcome(Xid xid)
+            throws
+            UnknownXidException,
+            InternalErrorException
+    {
+        synchronized (xid)
+        {
+            JDBCTransaction tx = getTransaction(xid);
+            if (!tx.isPrepared())
+            {
+                // heuristically rollback this tx
+                for (TransactionRecord record : tx.getrecords())
+                {
+                    record.rollback(_messagStore);
+                }
+                tx.heurRollback();
+            }
+            // add this branch in the list of indoubt tx
+            _indoubtXidMap.put(xid, tx);
+        }
+    }
+
+
+    public JDBCTransaction getTransaction(Xid xid)
+            throws
+            UnknownXidException
+    {
+        Transaction tx = _xidMap.get(xid);
+        if (tx == null)
+        {
+            throw new UnknownXidException(xid);
+        }
+        return (JDBCTransaction) tx;
+    }
+
+    //==========================================================================
+    //== Methods for Message Store
+    //==========================================================================
+
+    /**
+     * Get the default tx timeout in seconds
+     *
+     * @return the default tx timeout in seconds
+     */
+    public int getDefaultTimeout()
+    {
+        return _defaultTimeout;
+    }
+    //==========================================================================
+    //== Private Methods
+    //==========================================================================
+
+    private void removeTransaction(Xid xid)
+    {
+        _xidMap.remove(xid);
+        _indoubtXidMap.remove(xid);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue May 22 01:49:53 2007
@@ -93,7 +93,10 @@
     {
         try
         {
-            message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
+            if( ! deliverFirst )
+            {
+                message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
+            }
             queue.process(_storeContext, message, deliverFirst);
             //following check implements the functionality
             //required by the 'immediate' flag:

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Tue May 22 01:49:53 2007
@@ -57,10 +57,14 @@
         super(new MapConfiguration(new HashMap()));
     }
 
-    public void initialise() throws Exception
+    public void initialise()
+            throws
+            Exception
     {
         _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.MemoryMessageStore");
         _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.MemoryTransactionManager");
+       // _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.JDBCStore");
+       // _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.JDBCTransactionManager");
 
         Properties users = new Properties();
 
@@ -119,5 +123,6 @@
         return _accessManager;
     }
 }
+