You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/05/22 10:49:56 UTC

svn commit: r540493 [1/2] - in /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server: ./ messageStore/ queue/ txn/ util/

Author: gsim
Date: Tue May 22 01:49:53 2007
New Revision: 540493

URL: http://svn.apache.org/viewvc?view=rev&rev=540493
Log:
Patch from Arnaud Simon (asimon@redhat.com) in connection with QPID-496. This adds a JDBC based message store implementation.


Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue May 22 01:49:53 2007
@@ -473,7 +473,7 @@
                 unacked.message.setRedelivered(true);
 
                 // Deliver Message
-                deliveryContext.deliver(unacked.message, unacked.queue, false);
+                deliveryContext.deliver(unacked.message, unacked.queue, true);
 
                 // Should we allow access To the DM to directy deliver the message?
                 // As we don't need to check for Consumers or worry about incrementing the message count?

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java Tue May 22 01:49:53 2007
@@ -0,0 +1,1759 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.*;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.sql.*;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 15-May-2007
+ * Time: 09:59:12
+ */
+public class JDBCStore implements MessageStore
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(JDBCStore.class);
+    // the database connection pool
+    public static ConnectionPool _connectionPool = null;
+    // the prepared statements
+    //==== IMPORTANT: remember to update if we add more prepared statements!
+    private static final int CREATE_EXCHANGE = 0;
+    private static final int DELETE_EXCHANGE = 1;
+    private static final int BIND_QUEUE = 2;
+    private static final int UNBIND_QUEUE = 3;
+    private static final int CREATE_QUEUE = 4;
+    private static final int DELETE_QUEUE = 5;
+    private static final int STAGE_MESSAGE = 6;
+    private static final int UPDATE_MESSAGE_PAYLOAD = 7;
+    private static final int SELECT_MESSAGE_PAYLOAD = 8;
+    private static final int DELETE_MESSAGE = 9;
+    private static final int ENQUEUE = 10;
+    private static final int DEQUEUE = 11;
+    private static final int GET_ALL_QUEUES = 12;
+    private static final int GET_ALL_MESSAGES = 13;
+    private static final int SAVE_RECORD = 14;
+    private static final int SAVE_XID = 15;
+    private static final int DELETE_RECORD = 16;
+    private static final int DELETE_XID = 17;
+    private static final int UPDATE_QMR = 18;
+    private static final int GET_CONTENT_HEADER = 19;
+    private static final int GET_MESSAGE_INFO = 20;
+    //==== size:
+    private static final int STATEMENT_SIZE = 21;
+    //========================================================================
+    // field properties
+    //========================================================================
+    //The default URL
+    protected String _connectionURL = "jdbc:derby:derbyDB;create=true";
+    // The default driver
+    private String _driver = "org.apache.derby.jdbc.EmbeddedDriver";
+    // The pool max size
+    private int _maxSize = 40;
+    // The tables
+    // the table containing the messages
+    private String _tableNameMessage = "MessageTable";
+    private String _tableNameQueue = "QueueTable";
+    private String _tableNameQueueMessageRelation = "QeueMessageRelation";
+    private String _tableNameExchange = "Exchange";
+    private String _tableNameExchangeQueueRelation = "ExchangeQueueRelation";
+    private String _tableNameTransaction = "TransactionTable";
+    private String _tableNameRecord = "RecordTable";
+
+    // The transaction maanger
+    private JDBCTransactionManager _tm;
+    // the message ID
+    private long _messageID = 0;
+    // the virtual host
+    private VirtualHost _virtualHost;
+    // indicate whether this store is recovering
+    private boolean _recovering = false;
+    // the recovered queues
+    private HashMap<Integer, AMQQueue> _queueMap;
+
+    //========================================================================
+    // Interface MessageStore
+    //========================================================================
+    public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+            throws
+            InternalErrorException,
+            IllegalArgumentException
+    {
+        _log.info("Configuring Derby message store");
+        // the virtual host
+        _virtualHost = virtualHost;
+        // Specify that the tables must be dropped.
+        // If true then this means that recovery is not possible.
+        boolean dropTables = true;
+        if (config != null)
+        {
+            dropTables = config.getBoolean(base + "dropTables", false);
+            _driver = config.getString(base + "driver", _driver);
+            _connectionURL = config.getString(base + "connectionURL", _connectionURL);
+            _maxSize = config.getInt(base + "connectionPoolSize", 20);
+        }
+        if (dropTables)
+        {
+            _log.info("Dropping table of Derby message store");
+        }
+        if (!setupStore(dropTables))
+        {
+            _log.error("Error configuration of Derby store failed");
+            throw new InternalErrorException("Error configuration of Derby store failed");
+        }
+        // recovery
+        _recovering = true;
+        _queueMap = recover(); //==> recover the queues and the messages
+        // recreate the excahnges and bind the queues
+        recoverExchanges(_queueMap);
+        _recovering = false;
+        _tm = (JDBCTransactionManager) tm;
+        _tm.configure(this, "txn", config);
+        _queueMap.clear();
+        _queueMap = null;
+    }
+
+    public void close()
+            throws
+            InternalErrorException
+    {
+        // nothing has to be done
+    }
+
+    public void createExchange(Exchange exchange)
+            throws
+            InternalErrorException
+    {
+        if (!_recovering)
+        {
+            MyConnection connection = null;
+            try
+            {
+                connection = (MyConnection) _connectionPool.acquireInstance();
+                PreparedStatement pstmt = connection.getStatements()[CREATE_EXCHANGE];
+                if (pstmt == null)
+                {
+                    pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchange +
+                            " (Name,Type) VALUES (?,?)");
+                    connection.getStatements()[CREATE_EXCHANGE] = pstmt;
+                }
+                pstmt.setString(1, exchange.getName().asString());
+                pstmt.setString(2, exchange.getType().asString());
+                pstmt.executeUpdate();
+            } catch (Exception e)
+            {
+                throw new InternalErrorException("Cannot create Exchange: " + exchange);
+            } finally
+            {
+                if (connection != null)
+                {
+                    try
+                    {
+                        connection.getConnection().commit();
+                        _connectionPool.releaseInstance(connection);
+                    } catch (SQLException e)
+                    {
+                        // we did not manage to commit this connection
+                        // it is better to release it
+                        _connectionPool.releaseDeadInstance();
+                        throw new InternalErrorException("Cannot create Exchange: " + exchange);
+                    }
+                }
+            }
+        }
+    }
+
+    public void removeExchange(Exchange exchange)
+            throws
+            InternalErrorException
+    {
+        if (!_recovering)
+        {
+            MyConnection connection = null;
+            try
+            {
+                connection = (MyConnection) _connectionPool.acquireInstance();
+                PreparedStatement pstmt = connection.getStatements()[DELETE_EXCHANGE];
+                if (pstmt == null)
+                {
+                    pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchange +
+                            " WHERE Name = ?");
+                    connection.getStatements()[DELETE_EXCHANGE] = pstmt;
+                }
+                pstmt.setString(1, exchange.getName().asString());
+                pstmt.executeUpdate();
+            } catch (Exception e)
+            {
+                throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+            } finally
+            {
+                if (connection != null)
+                {
+                    try
+                    {
+                        connection.getConnection().commit();
+                        _connectionPool.releaseInstance(connection);
+                    } catch (SQLException e)
+                    {
+                        // we did not manage to commit this connection
+                        // it is better to release it
+                        _connectionPool.releaseDeadInstance();
+                        throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+                    }
+                }
+            }
+        }
+    }
+
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException
+    {
+        if (!_recovering)
+        {
+            MyConnection connection = null;
+            try
+            {
+                connection = (MyConnection) _connectionPool.acquireInstance();
+                PreparedStatement pstmt = connection.getStatements()[BIND_QUEUE];
+                if (pstmt == null)
+                {
+                    pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchangeQueueRelation +
+                            " (QueueID,Name,RoutingKey,fieldTable) VALUES (?,?,?,?)");
+                    connection.getStatements()[BIND_QUEUE] = pstmt;
+                }
+                pstmt.setInt(1, queue.getQueueID());
+                pstmt.setString(2, exchange.getName().asString());
+                pstmt.setString(3, routingKey.asString());
+                if (args != null)
+                {
+                    pstmt.setBytes(4, args.getDataAsBytes());
+                } else
+                {
+                    pstmt.setBytes(4, null);
+                }
+                pstmt.executeUpdate();
+            } catch (Exception e)
+            {
+                throw new InternalErrorException("Cannot create Exchange: " + exchange);
+            } finally
+            {
+                if (connection != null)
+                {
+                    try
+                    {
+                        connection.getConnection().commit();
+                        _connectionPool.releaseInstance(connection);
+                    } catch (SQLException e)
+                    {
+                        // we did not manage to commit this connection
+                        // it is better to release it
+                        _connectionPool.releaseDeadInstance();
+                        throw new InternalErrorException("Cannot create Exchange: " + exchange);
+                    }
+                }
+            }
+        }
+    }
+
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException
+    {
+        MyConnection connection = null;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            PreparedStatement pstmt = connection.getStatements()[UNBIND_QUEUE];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchangeQueueRelation +
+                        " WHERE QueueID = ? AND NAME = ? AND RoutingKey = ?");
+                connection.getStatements()[UNBIND_QUEUE] = pstmt;
+            }
+            pstmt.setInt(1, queue.getQueueID());
+            pstmt.setString(2, exchange.getName().asString());
+            pstmt.setString(3, routingKey.asString());
+            pstmt.executeUpdate();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+                }
+            }
+        }
+    }
+
+    public void createQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueAlreadyExistsException
+    {
+        MyConnection connection = null;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            PreparedStatement pstmt = connection.getStatements()[CREATE_QUEUE];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueue +
+                        " (QueueID,Name,Owner) VALUES (?,?,?)");
+                connection.getStatements()[CREATE_QUEUE] = pstmt;
+            }
+            pstmt.setInt(1, queue.getQueueID());
+            pstmt.setString(2, queue.getName().asString());
+            if (queue.getOwner() != null)
+            {
+                pstmt.setString(3, queue.getOwner().asString());
+            } else
+            {
+                pstmt.setString(3, null);
+            }
+            pstmt.executeUpdate();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot create Queue: " + queue);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot create Queue: " + queue);
+                }
+            }
+        }
+    }
+
+    public void destroyQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException
+    {
+        MyConnection connection = null;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            PreparedStatement pstmt = connection.getStatements()[DELETE_QUEUE];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueue +
+                        " WHERE QueueID = ?");
+                connection.getStatements()[DELETE_QUEUE] = pstmt;
+            }
+            pstmt.setInt(1, queue.getQueueID());
+            pstmt.executeUpdate();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot remove Queue: " + queue);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot remove Queue: " + queue);
+                }
+            }
+        }
+    }
+
+    public void stage(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageAlreadyStagedException
+    {
+        if (m.isStaged() || m.isEnqueued())
+        {
+            _log.error("Message with Id " + m.getMessageId() + " is already staged");
+            throw new MessageAlreadyStagedException("Message eith Id " + m.getMessageId() + " is already staged");
+        }
+        MyConnection connection = null;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            stage(connection, m);
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot stage Message: " + m);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot stage Message: " + m);
+                }
+            }
+        }
+    }
+
+    public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        // The message must have been staged
+        if (!m.isStaged())
+        {
+            _log.error("Cannot append content of message Id "
+                    + m.getMessageId() + " as it has not been staged");
+            throw new MessageDoesntExistException("Cannot append content of message Id "
+                    + m.getMessageId() + " as it has not been staged");
+        }
+        MyConnection connection = null;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            appendContent(connection, m, data, offset, size);
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot stage Message: " + m);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot stage Message: " + m);
+                }
+            }
+        }
+    }
+
+    public byte[] loadContent(StorableMessage m, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        MyConnection connection = null;
+        try
+        {
+            byte[] result;
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            PreparedStatement pstmt = connection.getStatements()[SELECT_MESSAGE_PAYLOAD];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
+                        " WHERE MessageID = ? ");
+                connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
+            }
+            pstmt.setLong(1, m.getMessageId());
+            ResultSet rs = pstmt.executeQuery();
+            if (!rs.next())
+            {
+                throw new MessageDoesntExistException("Cannot load content of message Id "
+                        + m.getMessageId() + " as it has not been found");
+            }
+            Blob myBlob = rs.getBlob(1);
+
+            if (myBlob.length() > 0)
+            {
+                if (size == 0)
+                {
+                    result = myBlob.getBytes(offset, (int) myBlob.length());
+                } else
+                {
+                    result = myBlob.getBytes(offset, size);
+                }
+            } else
+            {
+                throw new MessageDoesntExistException("Cannot load content of message Id "
+                        + m.getMessageId() + " as it has not been found");
+            }
+            rs.close();
+            return result;
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot load Message: " + m);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot load Message: " + m);
+                }
+            }
+        }
+    }
+
+    public void destroy(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        MyConnection connection = null;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            destroy(connection, m);
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot destroy message: " + m);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot destroy message: " + m);
+                }
+            }
+        }
+    }
+
+    public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException
+    {
+        MyConnection connection = null;
+        // Get the current tx
+        JDBCTransaction tx = getTx(xid);
+        // If this operation is transacted then we need to add a record
+        if (tx != null && !tx.isPrepared())
+        {
+            // add an enqueue record
+            tx.addRecord(new JDBCEnqueueRecord(m, queue));
+        } else
+        {
+            try
+            {
+                if (tx != null)
+                {
+                    connection = tx.getConnection();
+                } else
+                {
+                    connection = (MyConnection) _connectionPool.acquireInstance();
+                }
+                if (!m.isStaged() && !m.isEnqueued())
+                {
+                    //This is the first time this message is enqueued and it has not been staged.
+                    stage(connection, m);
+                    appendContent(connection, m, m.getData(), 0, m.getData().length);
+                }
+                PreparedStatement pstmt = connection.getStatements()[ENQUEUE];
+                if (pstmt == null)
+                {
+                    pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueueMessageRelation +
+                            " (QueueID,MessageID,Prepared) VALUES (?,?,0)");
+                    connection.getStatements()[ENQUEUE] = pstmt;
+                }
+                pstmt.setInt(1, queue.getQueueID());
+                pstmt.setLong(2, m.getMessageId());
+                pstmt.executeUpdate();
+                m.enqueue(queue);
+                queue.enqueue(m);
+            } catch (Exception e)
+            {
+                throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+            } finally
+            {
+                if (tx == null && connection != null)
+                {
+                    try
+                    {
+                        connection.getConnection().commit();
+                        _connectionPool.releaseInstance(connection);
+                    } catch (SQLException e)
+                    {
+                        // we did not manage to commit this connection
+                        // it is better to release it
+                        _connectionPool.releaseDeadInstance();
+                        throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+                    }
+                }
+            }
+        }
+    }
+
+    public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException
+    {
+        MyConnection connection = null;
+        // Get the current tx
+        JDBCTransaction tx = getTx(xid);
+        // If this operation is transacted then we need to add a record
+        if (tx != null && !tx.isPrepared())
+        {
+            // add an dequeue record
+            tx.addRecord(new JDBCDequeueRecord(m, queue));
+        } else
+        {
+            try
+            {
+                if (tx != null)
+                {
+                    connection = tx.getConnection();
+                } else
+                {
+                    connection = (MyConnection) _connectionPool.acquireInstance();
+                }
+                PreparedStatement pstmt = connection.getStatements()[DEQUEUE];
+                if (pstmt == null)
+                {
+                    pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueueMessageRelation +
+                            " WHERE QueueID = ? AND MessageID = ?");
+                    connection.getStatements()[DEQUEUE] = pstmt;
+                }
+                pstmt.setInt(1, queue.getQueueID());
+                pstmt.setLong(2, m.getMessageId());
+                pstmt.executeUpdate();
+                m.dequeue(queue);
+                if (!m.isEnqueued())
+                {
+                    // delete this message from persistence store
+                    destroy(connection, m);
+                }
+                queue.dequeue(m);
+            } catch (Exception e)
+            {
+                throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+            } finally
+            {
+                if (tx == null && connection != null)
+                {
+                    try
+                    {
+                        connection.getConnection().commit();
+                        _connectionPool.releaseInstance(connection);
+                    } catch (SQLException e)
+                    {
+                        // we did not manage to commit this connection
+                        // it is better to release it
+                        _connectionPool.releaseDeadInstance();
+                        throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+                    }
+                }
+            }
+        }
+    }
+
+    public Collection<StorableQueue> getAllQueues()
+            throws
+            InternalErrorException
+    {
+        MyConnection connection = null;
+        List<StorableQueue> result = new ArrayList<StorableQueue>();
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            PreparedStatement pstmt = connection.getStatements()[GET_ALL_QUEUES];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("SELECT * FROM " + _tableNameQueue);
+                connection.getStatements()[GET_ALL_QUEUES] = pstmt;
+            }
+            ResultSet rs = pstmt.executeQuery();
+            while (rs.next())
+            {
+                //the queue owner may be null
+                AMQShortString queueOwner = null;
+                if (rs.getString(3) != null)
+                {
+                    queueOwner = new AMQShortString(rs.getString(3));
+                }
+                result.add(new AMQQueue(new AMQShortString(rs.getString(2)), true, queueOwner,
+                        false, _virtualHost));
+            }
+            rs.close();
+            return result;
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot get all queues");
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot get all queues");
+                }
+            }
+        }
+    }
+
+    public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+            throws
+            InternalErrorException
+    {
+        MyConnection connection = null;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            return getAllMessages(connection, queue);
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot get all queues");
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot get all queues");
+                }
+            }
+        }
+    }
+
+    public HashMap<Xid, Transaction> getAllInddoubt()
+            throws
+            InternalErrorException
+    {
+        MyConnection connection = null;
+        HashMap<Xid, Transaction> result = new HashMap<Xid, Transaction>();
+        try
+        {
+            TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+            MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
+            // re-create all the tx
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            Statement stmt = connection.getConnection().createStatement();
+            ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameTransaction);
+            JDBCTransaction foundTx;
+            Xid foundXid;
+            long foundXIDID;
+            while (rs.next())
+            {
+                // set the XID_ID
+                foundXIDID = rs.getLong(1);
+                if (foundXIDID > JDBCTransaction._xidId)
+                {
+                    JDBCTransaction._xidId = foundXIDID;
+                }
+                foundTx = new JDBCTransaction();
+                foundXid = new XidImpl(rs.getBlob(3).getBytes(1, (int) rs.getBlob(3).length()),
+                        rs.getInt(2), rs.getBlob(4).getBytes(1, (int) rs.getBlob(4).length()));
+                // get all the records
+                Statement stmtr = connection.getConnection().createStatement();
+                ResultSet rsr = stmtr.executeQuery("SELECT * FROM " + _tableNameRecord +
+                        " WHERE XID_ID = " + rs.getLong(1));
+                int foundType;
+                AMQQueue foundQueue;
+                StorableMessage foundMessage;
+                TransactionRecord foundRecord;
+                while (rsr.next())
+                {
+                    // those messages were not recovered before so they need to be recreated
+                    foundType = rsr.getInt(2);
+                    foundQueue = _queueMap.get(new Integer(rsr.getInt(4)));
+                    foundMessage = new AMQMessage(rs.getLong(3), this, messageHandleFactory, txnContext);
+                    if (foundType == JDBCAbstractRecord.TYPE_DEQUEUE)
+                    {
+                        foundRecord = new JDBCDequeueRecord(foundMessage, foundQueue);
+                    } else
+                    {
+                        foundRecord = new JDBCEnqueueRecord(foundMessage, foundQueue);
+                    }
+                    foundTx.addRecord(foundRecord);
+                }
+                rsr.close();
+                // add this tx to the map
+                result.put(foundXid, foundTx);
+            }
+            rs.close();
+            return result;
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot recover: ", e);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot recover: ", e);
+                }
+            }
+        }
+    }
+
+
+    public long getNewMessageId()
+    {
+        return _messageID++;
+    }
+
+    //========================================================================
+    // Public methods
+    //========================================================================
+
+    public MyConnection getConnection()
+            throws
+            Exception
+    {
+        return (MyConnection) _connectionPool.acquireInstance();
+    }
+
+    public void commitConnection(MyConnection connection)
+            throws
+            InternalErrorException
+    {
+        try
+        {
+            connection.getConnection().commit();
+            _connectionPool.releaseInstance(connection);
+        } catch (SQLException e)
+        {
+            // we did not manage to commit this connection
+            // it is better to release it
+            _connectionPool.releaseDeadInstance();
+            throw new InternalErrorException("Cannot commit connection =");
+        }
+    }
+
+    public void rollbackConnection(MyConnection connection)
+            throws
+            InternalErrorException
+    {
+        try
+        {
+            connection.getConnection().rollback();
+            _connectionPool.releaseInstance(connection);
+        } catch (SQLException e)
+        {
+            // we did not manage to rollback this connection
+            // it is better to release it
+            _connectionPool.releaseDeadInstance();
+            throw new InternalErrorException("Cannot rollback connection");
+        }
+    }
+
+    public void appendContent(MyConnection connection, StorableMessage m, byte[] data, int offset, int size)
+            throws
+            SQLException,
+            MessageDoesntExistException
+    {
+        PreparedStatement pstmt = connection.getStatements()[SELECT_MESSAGE_PAYLOAD];
+        if (pstmt == null)
+        {
+            pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
+                    " WHERE MessageID = ? ");
+            connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
+        }
+        pstmt.setLong(1, m.getMessageId());
+        ResultSet rs = pstmt.executeQuery();
+        if (!rs.next())
+        {
+            throw new MessageDoesntExistException("Cannot append content of message Id "
+                    + m.getMessageId() + " as it has not been found");
+        }
+        Blob myBlob = rs.getBlob(1);
+        byte[] oldPayload;
+        if (myBlob != null && myBlob.length() > 0)
+        {
+            oldPayload = myBlob.getBytes(1, (int) myBlob.length());
+        } else
+        {
+            oldPayload = new byte[0];
+        }
+        rs.close();
+        byte[] newPayload = new byte[oldPayload.length + size];
+        ByteBuffer buffer = ByteBuffer.wrap(newPayload);
+        buffer.put(oldPayload);
+        buffer.put(data, offset, size);
+        PreparedStatement pstmtUpdate = connection.getStatements()[UPDATE_MESSAGE_PAYLOAD];
+        if (pstmtUpdate == null)
+        {
+            pstmtUpdate = connection.getConnection().prepareStatement("UPDATE " + _tableNameMessage +
+                    " SET Payload = ? WHERE MessageID = ?");
+            connection.getStatements()[UPDATE_MESSAGE_PAYLOAD] = pstmtUpdate;
+        }
+        pstmtUpdate.setBytes(1, newPayload);
+        pstmtUpdate.setLong(2, m.getMessageId());
+        pstmtUpdate.executeUpdate();
+    }
+
+    public void stage(MyConnection connection, StorableMessage m)
+            throws
+            Exception
+    {
+        PreparedStatement pstmt = connection.getStatements()[STAGE_MESSAGE];
+        if (pstmt == null)
+        {
+            pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameMessage +
+                    " (MessageID,Header,ExchangeName,RoutingKey,Mandatory,Is_Immediate) VALUES (?,?,?,?,?,?)");
+            connection.getStatements()[STAGE_MESSAGE] = pstmt;
+        }
+        pstmt.setLong(1, m.getMessageId());
+        pstmt.setBytes(2, m.getHeaderBody());
+        pstmt.setString(3, ((AMQMessage) m).getMessagePublishInfo().getExchange().asString());
+        pstmt.setString(4, ((AMQMessage) m).getMessagePublishInfo().getRoutingKey().asString());
+        pstmt.setBoolean(5, ((AMQMessage) m).getMessagePublishInfo().isMandatory());
+        pstmt.setBoolean(6, ((AMQMessage) m).getMessagePublishInfo().isImmediate());
+        pstmt.executeUpdate();
+        m.staged();
+    }
+
+    public void saveRecord(MyConnection connection, JDBCTransaction tx, JDBCAbstractRecord record)
+            throws
+            InternalErrorException
+    {
+        try
+        {
+            PreparedStatement pstmt = connection.getStatements()[SAVE_RECORD];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameRecord +
+                        " (XID_ID,Type,MessageID,QueueID) VALUES (?,?,?,?)");
+                connection.getStatements()[SAVE_RECORD] = pstmt;
+            }
+            pstmt.setLong(1, tx.getXidID());
+            pstmt.setInt(2, record.getType());
+            pstmt.setLong(3, record.getMessageID());
+            pstmt.setLong(4, record.getQueueID());
+            pstmt.executeUpdate();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot save record: " + record);
+        }
+    }
+
+    public void saveXID(MyConnection connection, JDBCTransaction tx, Xid xid)
+            throws
+            InternalErrorException
+    {
+        try
+        {
+            PreparedStatement pstmt = connection.getStatements()[SAVE_XID];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameTransaction +
+                        " (XID_ID,FormatId, BranchQualifier,GlobalTransactionId) VALUES (?,?,?,?)");
+                connection.getStatements()[SAVE_XID] = pstmt;
+            }
+            pstmt.setLong(1, tx.getXidID());
+            pstmt.setInt(2, xid.getFormatId());
+            pstmt.setBytes(3, xid.getBranchQualifier());
+            pstmt.setBytes(4, xid.getGlobalTransactionId());
+            pstmt.executeUpdate();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot save xid: " + xid);
+        }
+    }
+
+    public void deleteRecords(MyConnection connection, JDBCTransaction tx)
+            throws
+            InternalErrorException
+    {
+        try
+        {
+            PreparedStatement pstmt = connection.getStatements()[DELETE_RECORD];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameRecord +
+                        " WHERE XID_ID = ?");
+                connection.getStatements()[DELETE_RECORD] = pstmt;
+            }
+            pstmt.setLong(1, tx.getXidID());
+            pstmt.executeUpdate();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot delete record: " + tx.getXidID());
+        }
+    }
+
+    public void deleteXID(MyConnection connection, JDBCTransaction tx)
+            throws
+            InternalErrorException
+    {
+        try
+        {
+            PreparedStatement pstmt = connection.getStatements()[DELETE_XID];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameTransaction +
+                        " WHERE XID_ID = ?");
+                connection.getStatements()[DELETE_XID] = pstmt;
+            }
+            pstmt.setLong(1, tx.getXidID());
+            pstmt.executeUpdate();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot delete xid: " + tx.getXidID());
+        }
+    }
+
+    public void prepareDequeu(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            UnknownXidException,
+            InternalErrorException
+    {
+        JDBCTransaction tx = getTx(xid);
+        if (tx == null)
+        {
+            throw new UnknownXidException(xid);
+        }
+        updateQueueMessageRelation(tx.getConnection(), queue.getQueueID(), m.getMessageId(), 1);
+
+    }
+
+    public void rollbackDequeu(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            UnknownXidException,
+            InternalErrorException
+    {
+        JDBCTransaction tx = getTx(xid);
+        if (tx == null)
+        {
+            throw new UnknownXidException(xid);
+        }
+        updateQueueMessageRelation(tx.getConnection(), queue.getQueueID(), m.getMessageId(), 0);
+    }
+
+    //========================================================================
+    // Private methods
+    //========================================================================
+
+
+    private void updateQueueMessageRelation(MyConnection connection,
+                                            int queueID, long messageId, int prepared)
+            throws
+            InternalErrorException
+    {
+        try
+        {
+            PreparedStatement pstmt = connection.getStatements()[UPDATE_QMR];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("UPDATE " + _tableNameQueueMessageRelation +
+                        " SET Prepared = ? WHERE MessageID = ? AND QueueID = ?");
+                connection.getStatements()[UPDATE_QMR] = pstmt;
+            }
+            pstmt.setInt(1, prepared);
+            pstmt.setLong(2, messageId);
+            pstmt.setInt(3, queueID);
+            pstmt.executeUpdate();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot update QMR", e);
+        }
+
+    }
+
+    public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+            throws
+            InternalErrorException
+    {
+        MyConnection connection = null;
+        MessagePublishInfo result;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            PreparedStatement pstmt = connection.getStatements()[GET_MESSAGE_INFO];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("SELECT ExchangeName, RoutingKey," +
+                        " Mandatory, Is_Immediate from " + _tableNameMessage +
+                        " WHERE MessageID = ?");
+                connection.getStatements()[GET_MESSAGE_INFO] = pstmt;
+            }
+            pstmt.setLong(1, m.getMessageId());
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next())
+            {
+                final AMQShortString exchange = new AMQShortString(rs.getString(1));
+                final AMQShortString routingKey = new AMQShortString(rs.getString(2));
+                final boolean mandatory = rs.getBoolean(3);
+                final boolean immediate = rs.getBoolean(4);
+                result = new MessagePublishInfo()
+                {
+
+                    public AMQShortString getExchange()
+                    {
+                        return exchange;
+                    }
+
+                    public boolean isImmediate()
+                    {
+                        return immediate;
+                    }
+
+                    public boolean isMandatory()
+                    {
+                        return mandatory;
+                    }
+
+                    public AMQShortString getRoutingKey()
+                    {
+                        return routingKey;
+                    }
+                };
+            } else
+            {
+                throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
+            }
+            rs.close();
+            return result;
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
+                }
+            }
+        }
+    }
+
+    public ContentHeaderBody getContentHeaderBody(StorableMessage m)
+            throws
+            InternalErrorException
+    {
+        MyConnection connection = null;
+        ContentHeaderBody result;
+        try
+        {
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            PreparedStatement pstmt = connection.getStatements()[GET_CONTENT_HEADER];
+            if (pstmt == null)
+            {
+                pstmt = connection.getConnection().prepareStatement("SELECT Header from " + _tableNameMessage +
+                        " WHERE MessageID = ?");
+                connection.getStatements()[GET_CONTENT_HEADER] = pstmt;
+            }
+            pstmt.setLong(1, m.getMessageId());
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next())
+            {
+                result = new ContentHeaderBody(ByteBuffer.wrap(rs.getBlob(1).getBytes(1, (int) rs.getBlob(1).length())), 0);
+            } else
+            {
+                throw new InternalErrorException("Cannot get Content Header of message: " + m);
+            }
+            rs.close();
+            return result;
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot get Content Header of message: " + m);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot get Content Header of message: " + m);
+                }
+            }
+        }
+    }
+
+    private List<StorableMessage> getAllMessages(MyConnection connection, StorableQueue queue)
+            throws
+            SQLException,
+            AMQException
+    {
+        List<StorableMessage> result = new ArrayList<StorableMessage>();
+        TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+        MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
+        PreparedStatement pstmt = connection.getStatements()[GET_ALL_MESSAGES];
+        if (pstmt == null)
+        {
+            pstmt = connection.getConnection().prepareStatement("SELECT " + _tableNameMessage + ".MessageID, Header FROM " +
+                    _tableNameMessage +
+                    " INNER JOIN " +
+                    _tableNameQueueMessageRelation +
+                    " ON " +
+                    _tableNameMessage + ".MessageID = " + _tableNameQueueMessageRelation + ".MessageID" +
+                    " WHERE " +
+                    _tableNameQueueMessageRelation + ".QueueID = ?" +
+                    " AND " +
+                    _tableNameQueueMessageRelation + ".Prepared = 0");
+            connection.getStatements()[GET_ALL_MESSAGES] = pstmt;
+        }
+        pstmt.setInt(1, queue.getQueueID());
+        ResultSet rs = pstmt.executeQuery();
+        AMQMessage foundMessage;
+        // ContentHeaderBody hb;
+        while (rs.next())
+        {
+            foundMessage = new AMQMessage(rs.getLong(1), this, messageHandleFactory, txnContext);
+            result.add(foundMessage);
+        }
+        rs.close();
+        return result;
+    }
+
+    private HashMap<Integer, AMQQueue> recover()
+            throws
+            InternalErrorException
+    {
+        MyConnection connection = null;
+        HashMap<Integer, AMQQueue> result = new HashMap<Integer, AMQQueue>();
+        try
+        {
+            // re-create all the queues
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            Statement stmt = connection.getConnection().createStatement();
+            ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameQueue);
+            AMQQueue foundQueue;
+            List<StorableMessage> foundMessages;
+            StoreContext context = new StoreContext();
+            while (rs.next())
+            {
+                AMQShortString owner = null;
+                if (rs.getString(3) != null)
+                {
+                    owner = new AMQShortString(rs.getString(3));
+                }
+                foundQueue = new AMQQueue(new AMQShortString(rs.getString(2)),
+                        true, owner, false, _virtualHost);
+                // get all the Messages of that queue
+                foundMessages = getAllMessages(connection, foundQueue);
+                // enqueue those messages
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug("Recovering " + foundMessages.size() + " messages for queue " + foundQueue.getName());
+                }
+                for (StorableMessage foundMessage : foundMessages)
+                {
+                    foundMessage.staged();                    
+                    foundMessage.enqueue(foundQueue);
+                    foundQueue.enqueue(foundMessage);
+                    foundQueue.process(context, (AMQMessage) foundMessage, false);
+                }
+                // add the queue in the result map
+                result.put(foundQueue.getQueueID(), foundQueue);
+                // add it in the registry
+                _virtualHost.getQueueRegistry().registerQueue(foundQueue);
+            }
+            rs.close();
+            return result;
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot recover: ", e);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot recover: ", e);
+                }
+            }
+        }
+    }
+
+    private void recoverExchanges(HashMap<Integer, AMQQueue> queueMap)
+            throws
+            InternalErrorException
+    {
+        MyConnection connection = null;
+        try
+        {
+            // re-create all the exchanges
+            connection = (MyConnection) _connectionPool.acquireInstance();
+            Statement stmt = connection.getConnection().createStatement();
+            ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameExchange);
+            Exchange foundExchange;
+            AMQQueue foundQueue;
+            while (rs.next())
+            {
+                foundExchange = _virtualHost.getExchangeFactory().createExchange(
+                        new AMQShortString(rs.getString(1)), new AMQShortString(rs.getString(2)), true, false, 0);
+                // get all the bindings
+                Statement stmtb = connection.getConnection().createStatement();
+                ResultSet rsb = stmtb.executeQuery("SELECT * FROM " + _tableNameExchangeQueueRelation +
+                        " WHERE Name = '" + rs.getString(1) + "'");
+                while (rsb.next())
+                {
+                    foundQueue = queueMap.get(new Integer(rsb.getInt(1)));
+                    if (foundQueue != null)
+                    {
+                        // the field table
+                        FieldTable ft = null;
+                        if (rsb.getBlob(4) != null)
+                        {
+                            long length = rsb.getBlob(4).length();
+                            ByteBuffer buffer = ByteBuffer.wrap(rsb.getBlob(4).getBytes(1, (int) length));
+                            ft = new FieldTable(buffer, length);
+                        }
+                        foundQueue.bind(new AMQShortString(rsb.getString(3)), ft, foundExchange);
+                    }
+                }
+                rsb.close();
+                // register this exchange
+                _virtualHost.getExchangeRegistry().registerExchange(foundExchange);
+            }
+            rs.close();
+        } catch (Exception e)
+        {
+            throw new InternalErrorException("Cannot recover: ", e);
+        } finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.getConnection().commit();
+                    _connectionPool.releaseInstance(connection);
+                } catch (SQLException e)
+                {
+                    // we did not manage to commit this connection
+                    // it is better to release it
+                    _connectionPool.releaseDeadInstance();
+                    throw new InternalErrorException("Cannot recover: ", e);
+                }
+            }
+        }
+    }
+
+    private void destroy(MyConnection connection, StorableMessage m)
+            throws
+            SQLException
+    {
+        PreparedStatement pstmt = connection.getStatements()[DELETE_MESSAGE];
+        if (pstmt == null)
+        {
+            pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameMessage +
+                    " WHERE MessageID = ?");
+            connection.getStatements()[DELETE_MESSAGE] = pstmt;
+        }
+        pstmt.setLong(1, m.getMessageId());
+        pstmt.executeUpdate();
+    }
+
+    private JDBCTransaction getTx(Xid xid)
+            throws
+            UnknownXidException
+    {
+        JDBCTransaction tx = null;
+        if (xid != null)
+        {
+            tx = _tm.getTransaction(xid);
+        }
+        return tx;
+    }
+
+    /**
+     * setupConnections - Initialize the connections
+     *
+     * @return true if ok
+     */
+    private synchronized boolean setupConnections()
+    {
+        try
+        {
+            if (_connectionPool == null)
+            {
+                // In an embedded environment, loading the driver also starts Derby.
+                Class.forName(_driver).newInstance();
+                _connectionPool = new ConnectionPool(_maxSize);
+            }
+        }
+        catch (Exception e)
+        {
+            _log.warn("Setup connections trouble", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Try to create the connection and table.
+     * If this fails, then we will exit.
+     */
+    protected synchronized boolean setupStore(boolean dropTables)
+    {
+        if (!setupConnections())
+        {
+            return false;
+        }
+        MyConnection myconnection = null;
+        try
+        {
+            myconnection = (MyConnection) _connectionPool.acquireInstance();
+            Statement stmt = myconnection._connection.createStatement();
+            /*
+            * TODO Need some management interface to delete the table!
+            */
+            if (dropTables)
+            {
+                try
+                {
+                    stmt.executeUpdate("DROP TABLE " + _tableNameMessage);
+                    myconnection._connection.commit();
+                }
+                catch (SQLException ex)
+                {
+                    // don't want to print error - chances are it
+                    // just reports that the table does not exist
+                    // ex.printStackTrace();
+                }
+                try
+                {
+                    stmt.executeUpdate("DROP TABLE " + _tableNameQueue);
+                    myconnection._connection.commit();
+                }
+                catch (SQLException ex)
+                {
+                    // ex.printStackTrace();
+                }
+                try
+                {
+                    stmt.executeUpdate("DROP TABLE " + _tableNameQueueMessageRelation);
+                    myconnection._connection.commit();
+                }
+                catch (SQLException ex)
+                {
+                    // ex.printStackTrace();
+                }
+                try
+                {
+                    stmt.executeUpdate("DROP TABLE " + _tableNameExchange);
+                    myconnection._connection.commit();
+                }
+                catch (SQLException ex)
+                {
+                    // ex.printStackTrace();
+                }
+                try
+                {
+                    stmt.executeUpdate("DROP TABLE " + _tableNameExchangeQueueRelation);
+                    myconnection._connection.commit();
+                }
+                catch (SQLException ex)
+                {
+                    // ex.printStackTrace();
+                }
+                try
+                {
+                    stmt.executeUpdate("DROP TABLE " + _tableNameRecord);
+                    myconnection._connection.commit();
+                }
+                catch (SQLException ex)
+                {
+                    // ex.printStackTrace();
+                }
+                try
+                {
+                    stmt.executeUpdate("DROP TABLE " + _tableNameTransaction);
+                    myconnection._connection.commit();
+                }
+                catch (SQLException ex)
+                {
+                    // ex.printStackTrace();
+                }
+            }
+            // create the table for messages
+            try
+            {
+                stmt.executeUpdate("CREATE TABLE " + _tableNameMessage + " (MessageID FLOAT NOT NULL, Header BLOB," +
+                        " Payload BLOB, ExchangeName VARCHAR(1024), RoutingKey VARCHAR(1024)," +
+                        " Mandatory INTEGER, Is_Immediate INTEGER, PRIMARY KEY(MessageID))");
+                myconnection._connection.commit();
+            }
+            catch (SQLException ex)
+            {
+                // ex.printStackTrace();
+                // assume this is reporting that the table already exists:
+            }
+            // create the table for queues
+            try
+            {
+                stmt.executeUpdate("CREATE TABLE " + _tableNameQueue + " (QueueID INTEGER NOT NULL, " +
+                        "Name VARCHAR(1024) NOT NULL, Owner VARCHAR(1024), PRIMARY KEY(QueueID))");
+                myconnection._connection.commit();
+            }
+            catch (SQLException ex)
+            {
+                //ex.printStackTrace();
+                // assume this is reporting that the table already exists:
+            }
+            // create the table for queue to message mapping
+            try
+            {
+                stmt.executeUpdate("CREATE TABLE " + _tableNameQueueMessageRelation + " (QueueID INTEGER NOT NULL, " +
+                        "MessageID FLOAT NOT NULL, Prepared INTEGER)");
+                myconnection._connection.commit();
+            }
+            catch (SQLException ex)
+            {
+                //ex.printStackTrace();
+                // assume this is reporting that the table already exists:
+            }
+            try
+            {
+                stmt.executeUpdate("CREATE TABLE " + _tableNameExchange + " (Name VARCHAR(1024) NOT NULL, " +
+                        "Type VARCHAR(1024) NOT NULL, PRIMARY KEY(Name))");
+                myconnection._connection.commit();
+            }
+            catch (SQLException ex)
+            {
+                //ex.printStackTrace();
+                // assume this is reporting that the table already exists:
+            }
+            try
+            {
+                stmt.executeUpdate("CREATE TABLE " + _tableNameExchangeQueueRelation + " (QueueID INTEGER NOT NULL, " +
+                        "Name VARCHAR(1024) NOT NULL, RoutingKey VARCHAR(1024), FieldTable BLOB )");
+                myconnection._connection.commit();
+            }
+            catch (SQLException ex)
+            {
+                //ex.printStackTrace();
+                // assume this is reporting that the table already exists:
+            }
+            try
+            {
+                stmt.executeUpdate("CREATE TABLE " + _tableNameRecord + " (XID_ID FLOAT, Type INTEGER, MessageID FLOAT, " +
+                        "QueueID INTEGER, PRIMARY KEY(Type, MessageID, QueueID))");
+                // we could alter the table with QueueID as foreign key
+                myconnection._connection.commit();
+            }
+            catch (SQLException ex)
+            {
+                //ex.printStackTrace();
+                // assume this is reporting that the table already exists:
+            }
+            try
+            {
+                stmt.executeUpdate("CREATE TABLE " + _tableNameTransaction + " (XID_ID FLOAT, FormatId INTEGER, " +
+                        "BranchQualifier BLOB, GlobalTransactionId BLOB, PRIMARY KEY(XID_ID))");
+                myconnection._connection.commit();
+            }
+            catch (SQLException ex)
+            {
+                // ex.printStackTrace();
+                // assume this is reporting that the table already exists:
+            }
+        }
+        catch (Throwable e)
+        {
+            _log.warn("Setup Store trouble: ", e);
+            return false;
+        }
+        finally
+        {
+            if (myconnection != null)
+            {
+                _connectionPool.releaseInstance(myconnection);
+            }
+        }
+        return true;
+    }
+    //========================================================================================
+    //============== the connection pool =====================================================
+    //========================================================================================
+
+    private class ConnectionPool extends Pool
+    {
+
+        /**
+         * Create a pool of specified size. Negative or null pool sizes are
+         * disallowed.
+         *
+         * @param poolSize The size of the pool to create. Should be 1 or
+         *                 greater.
+         * @throws Exception If the pool size is less than 1.
+         */
+        public ConnectionPool(int poolSize)
+                throws
+                Exception
+        {
+            super(poolSize);
+        }
+
+        /**
+         * @return An instance of the pooled object.
+         * @throws Exception In case of internal error.
+         */
+        protected MyConnection createInstance()
+                throws
+                Exception
+        {
+            try
+            {
+                // standard way to obtain a Connection object is to call the method DriverManager.getConnection,
+                // which takes a String containing a connection URL (uniform resource locator).
+                Connection conn = DriverManager.getConnection(_connectionURL);
+                //conn.setAutoCommit(true);
+                PreparedStatement[] st = new PreparedStatement[STATEMENT_SIZE];
+                for (int j = 0; j < STATEMENT_SIZE; j++)
+                {
+                    st[j] = null;
+                }
+                return new MyConnection(conn, st);
+            }
+            catch (SQLException e)
+            {
+                throw new Exception("sqlException when creating connection to " + _connectionURL, e);
+            }
+        }
+    }
+
+    public class MyConnection
+    {
+        // the connection
+        private Connection _connection = null;
+        // its associated prepared statements
+        private PreparedStatement[] _preparedStatements = null;
+
+        MyConnection(Connection con, PreparedStatement[] st)
+        {
+            _connection = con;
+            _preparedStatements = st;
+        }
+
+        public Connection getConnection()
+        {
+            return _connection;
+        }
+
+        public PreparedStatement[] getStatements()
+        {
+            return _preparedStatements;
+        }
+
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java Tue May 22 01:49:53 2007
@@ -29,6 +29,8 @@
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.exception.*;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.txn.MemoryDequeueRecord;
@@ -248,5 +250,24 @@
     public long getNewMessageId()
     {
         return _messageID++;
+    }
+
+
+    public ContentHeaderBody getContentHeaderBody(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        // do nothing this is only used during recovery
+        return null;
+    }
+
+    public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+       // do nothing this is only used during recovery
+        return null;
     }
 }