You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/06/19 15:29:23 UTC

svn commit: r669480 - /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java

Author: rgodfrey
Date: Thu Jun 19 06:29:23 2008
New Revision: 669480

URL: http://svn.apache.org/viewvc?rev=669480&view=rev
Log:
QPID-950 : Fixed Derby Message Store

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=669480&r1=669479&r2=669480&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Thu Jun 19 06:29:23 2008
@@ -1,11 +1,32 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
 package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.txn.TransactionalContext;
@@ -21,6 +42,7 @@
 import org.apache.mina.common.ByteBuffer;
 
 import java.io.File;
+import java.io.ByteArrayInputStream;
 import java.sql.DriverManager;
 import java.sql.Driver;
 import java.sql.Connection;
@@ -39,26 +61,6 @@
 import java.util.TreeMap;
 
 
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
 public class DerbyMessageStore implements MessageStore
 {
 
@@ -67,7 +69,7 @@
     private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
 
 
-    private static final String DERBY_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+    private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
 
     private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
 
@@ -91,6 +93,39 @@
     private String _connectionURL;
 
 
+
+    private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
+    private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+    private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
+    private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
+    private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
+    private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
+    private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
+    private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
+    private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
+    private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
+    private static final String SELECT_FROM_BINDINGS =
+            "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
+    private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+    private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+    private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
+    private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
+    private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
+    private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
+    private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
+    private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+    private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
+    private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
+    private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
+    private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
+    private static final String SELECT_FROM_MESSAGE_META_DATA =
+            "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+    private static final String SELECT_FROM_MESSAGE_CONTENT =
+            "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
+    private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
+    private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+
+
     private enum State
     {
         INITIAL,
@@ -129,10 +164,6 @@
 
         createOrOpenDatabase(databasePath);
 
-
-
-
-
         // this recovers durable queues and persistent messages
 
         recover();
@@ -145,7 +176,7 @@
     {
         if(DRIVER_CLASS == null)
         {
-            DRIVER_CLASS = (Class<Driver>) Class.forName(DERBY_DRIVER_NAME);
+            DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
         }
     }
 
@@ -163,7 +194,7 @@
         createMessageMetaDataTable(conn);
         createMessageContentTable(conn);
 
-
+        conn.close();
     }
 
 
@@ -174,10 +205,10 @@
         {
             Statement stmt = conn.createStatement();
 
-            stmt.execute("CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )");
+            stmt.execute(CREATE_DB_VERSION_TABLE);
             stmt.close();
 
-            PreparedStatement pstmt = conn.prepareStatement("INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )");
+            PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
             pstmt.setInt(1, DB_VERSION);
             pstmt.execute();
             pstmt.close();
@@ -191,8 +222,8 @@
         if(!tableExists(EXCHANGE_TABLE_NAME, conn))
         {
             Statement stmt = conn.createStatement();
-            
-            stmt.execute("CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )");
+
+            stmt.execute(CREATE_EXCHANGE_TABLE);
             stmt.close();
         }
     }
@@ -202,7 +233,7 @@
         if(!tableExists(QUEUE_TABLE_NAME, conn))
         {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )");
+            stmt.execute(CREATE_QUEUE_TABLE);
             stmt.close();
         }
     }
@@ -212,8 +243,7 @@
         if(!tableExists(BINDINGS_TABLE_NAME, conn))
         {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )");
-
+            stmt.execute(CREATE_BINDINGS_TABLE);
 
             stmt.close();
         }
@@ -225,7 +255,7 @@
         if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
         {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )");
+            stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
 
             stmt.close();
         }
@@ -237,7 +267,7 @@
         if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
         {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )");
+            stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
 
             stmt.close();
         }
@@ -250,7 +280,7 @@
         if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
         {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )");
+            stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
 
             stmt.close();
         }
@@ -261,7 +291,7 @@
 
     private boolean tableExists(final String tableName, final Connection conn) throws SQLException
     {
-        PreparedStatement stmt = conn.prepareStatement("SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?");
+        PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
         stmt.setString(1, tableName);
         ResultSet rs = stmt.executeQuery();
         boolean exists = rs.next();
@@ -283,8 +313,6 @@
 
             recoverExchanges();
 
-//
-
             try
             {
 
@@ -317,7 +345,7 @@
 
 
         Statement stmt = conn.createStatement();
-        ResultSet rs = stmt.executeQuery("SELECT name, owner FROM " + QUEUE_TABLE_NAME);
+        ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
         Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
         while(rs.next())
         {
@@ -353,7 +381,7 @@
 
 
             Statement stmt = conn.createStatement();
-            ResultSet rs = stmt.executeQuery("SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME);
+            ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
 
             Exchange exchange;
             while(rs.next())
@@ -391,7 +419,7 @@
         {
             conn = newConnection();
 
-            PreparedStatement stmt = conn.prepareStatement("SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?");
+            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
             stmt.setString(1, exchange.getName().toString());
 
             ResultSet rs = stmt.executeQuery();
@@ -425,6 +453,7 @@
                     }
 
                     queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
+
                 }
             }
         }
@@ -439,9 +468,7 @@
 
     public void close() throws Exception
     {
-
         _closed.getAndSet(true);
-
     }
 
     public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
@@ -462,12 +489,11 @@
         MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
         try
         {
-            PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?");
+            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
             stmt.setLong(1,messageId);
             wrapper.setRequiresCommit();
             int results = stmt.executeUpdate();
 
-
             if (results == 0)
             {
                 if (localTx)
@@ -484,8 +510,7 @@
                 _logger.debug("Deleted metadata for message " + messageId);
             }
 
-
-            stmt = conn.prepareStatement("DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?");
+            stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
             stmt.setLong(1,messageId);
             results = stmt.executeUpdate();
 
@@ -528,7 +553,7 @@
                 {
                     conn = newConnection();
 
-                    PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )");
+                    PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
                     stmt.setString(1, exchange.getName().toString());
                     stmt.setString(2, exchange.getType().toString());
                     stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
@@ -542,7 +567,6 @@
                     if(conn != null)
                     {
                         conn.close();
-
                     }
                 }
             }
@@ -561,7 +585,7 @@
         try
         {
             conn = newConnection();
-            PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?");
+            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
             stmt.setString(1, exchange.getName().toString());
             int results = stmt.executeUpdate();
             if(results == 0)
@@ -606,16 +630,20 @@
             try
             {
                 conn = newConnection();
-                // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
-                PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )");
+                PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
                 stmt.setString(1, exchange.getName().toString() );
                 stmt.setString(2, queue.getName().toString());
                 stmt.setString(3, routingKey == null ? null : routingKey.toString());
                 if(args != null)
                 {
+                    /* This would be the Java 6 way of setting a Blob
                     Blob blobArgs = conn.createBlob();
                     blobArgs.setBytes(0, args.getDataAsBytes());
                     stmt.setBlob(4, blobArgs);
+                    */
+                    byte[] bytes = args.getDataAsBytes();
+                    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+                    stmt.setBinaryStream(4, bis, bytes.length);
                 }
                 else
                 {
@@ -662,7 +690,7 @@
         {
             conn = newConnection();
             // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
-            PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?");
+            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
             stmt.setString(1, exchange.getName().toString() );
             stmt.setString(2, queue.getName().toString());
             stmt.setString(3, routingKey == null ? null : routingKey.toString());
@@ -711,7 +739,7 @@
                 Connection conn = newConnection();
 
                 PreparedStatement stmt =
-                        conn.prepareStatement("INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)");
+                        conn.prepareStatement(INSERT_INTO_QUEUE);
 
                 stmt.setString(1, queue.getName().toString());
                 stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
@@ -733,12 +761,12 @@
 
     private Connection newConnection() throws SQLException
     {
-        return DriverManager.getConnection(_connectionURL);
+        final Connection connection = DriverManager.getConnection(_connectionURL);
+        return connection;
     }
 
     public void removeQueue(final AMQQueue queue) throws AMQException
     {
-
         AMQShortString name = queue.getName();
         _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
         Connection conn = null;
@@ -747,7 +775,7 @@
         try
         {
             conn = newConnection();
-            PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?");
+            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
             stmt.setString(1, name.toString());
             int results = stmt.executeUpdate();
 
@@ -785,15 +813,15 @@
 
     public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
+        AMQShortString name = queue.getName();
 
         boolean localTx = getOrCreateTransaction(context);
         Connection conn = getConnection(context);
         ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-        AMQShortString name = queue.getName();
 
         try
         {
-            PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)");
+            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
             stmt.setString(1,name.toString());
             stmt.setLong(2,messageId);
             stmt.executeUpdate();
@@ -826,15 +854,15 @@
 
     public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
-
         AMQShortString name = queue.getName();
+
         boolean localTx = getOrCreateTransaction(context);
         Connection conn = getConnection(context);
         ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
 
         try
         {
-            PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?");
+            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
             stmt.setString(1,name.toString());
             stmt.setLong(2,messageId);
             int results = stmt.executeUpdate();
@@ -931,17 +959,18 @@
 
         try
         {
+            Connection conn = connWrapper.getConnection();
             if(connWrapper.requiresCommit())
             {
-                Connection conn = connWrapper.getConnection();
                 conn.commit();
 
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("commit tran completed");
                 }
-                conn.close();
+
             }
+            conn.close();
         }
         catch (SQLException e)
         {
@@ -1002,21 +1031,25 @@
                                       int index,
                                       ContentChunk contentBody,
                                       boolean lastContentBody) throws AMQException
-    {        
+    {
         boolean localTx = getOrCreateTransaction(context);
         Connection conn = getConnection(context);
         ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
 
         try
         {
-            PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)");
+            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
             stmt.setLong(1,messageId);
             stmt.setInt(2, index);
             byte[] chunkData = new byte[contentBody.getSize()];
             contentBody.getData().duplicate().get(chunkData);
+            /* this would be the Java 6 way of doing things
             Blob dataAsBlob = conn.createBlob();
             dataAsBlob.setBytes(1L, chunkData);
             stmt.setBlob(3, dataAsBlob);
+            */
+            ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+            stmt.setBinaryStream(3, bis, chunkData.length);
             stmt.executeUpdate();
             connWrapper.requiresCommit();
 
@@ -1048,7 +1081,7 @@
         try
         {
 
-            PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)");
+            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
             stmt.setLong(1,messageId);
             stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
             stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
@@ -1060,9 +1093,13 @@
             byte[] underlying = new byte[bodySize];
             ByteBuffer buf = ByteBuffer.wrap(underlying);
             headerBody.writePayload(buf);
+/*
             Blob dataAsBlob = conn.createBlob();
             dataAsBlob.setBytes(1L, underlying);
             stmt.setBlob(6, dataAsBlob);
+*/
+            ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+            stmt.setBinaryStream(6,bis,underlying.length);
 
             stmt.setInt(7, mmd.getContentChunkCount());
 
@@ -1096,7 +1133,7 @@
         try
         {
 
-            PreparedStatement stmt = conn.prepareStatement("SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?");
+            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
             stmt.setLong(1,messageId);
             ResultSet rs = stmt.executeQuery();
 
@@ -1181,7 +1218,7 @@
                 try
                 {
 
-                    PreparedStatement stmt = conn.prepareStatement("SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?");
+                    PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
                     stmt.setLong(1,messageId);
                     stmt.setInt(2, index);
                     ResultSet rs = stmt.executeQuery();
@@ -1269,6 +1306,7 @@
         public void process() throws AMQException
         {
             _queue.enqueue(_context, _message);
+
         }
 
     }
@@ -1303,7 +1341,7 @@
             TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
 
             Statement stmt = conn.createStatement();
-            ResultSet rs = stmt.executeQuery("SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME);
+            ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
 
 
             while (rs.next())
@@ -1318,6 +1356,7 @@
                 if (queue == null)
                 {
                     queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
+
                     _virtualHost.getQueueRegistry().registerQueue(queue);
                     queues.put(queueName, queue);
                 }