You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/06/19 15:29:23 UTC
svn commit: r669480 -
/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
Author: rgodfrey
Date: Thu Jun 19 06:29:23 2008
New Revision: 669480
URL: http://svn.apache.org/viewvc?rev=669480&view=rev
Log:
QPID-950 : Fixed Derby Message Store
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=669480&r1=669479&r2=669480&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Thu Jun 19 06:29:23 2008
@@ -1,11 +1,32 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
package org.apache.qpid.server.store;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -21,6 +42,7 @@
import org.apache.mina.common.ByteBuffer;
import java.io.File;
+import java.io.ByteArrayInputStream;
import java.sql.DriverManager;
import java.sql.Driver;
import java.sql.Connection;
@@ -39,26 +61,6 @@
import java.util.TreeMap;
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
public class DerbyMessageStore implements MessageStore
{
@@ -67,7 +69,7 @@
private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
- private static final String DERBY_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+ private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
@@ -91,6 +93,39 @@
private String _connectionURL;
+
+ private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
+ private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+ private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
+ private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
+ private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
+ private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
+ private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
+ private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
+ private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
+ private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
+ private static final String SELECT_FROM_BINDINGS =
+ "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
+ private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+ private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
+ private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
+ private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
+ private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
+ private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
+ private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+ private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
+ private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
+ private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
+ private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
+ private static final String SELECT_FROM_MESSAGE_META_DATA =
+ "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String SELECT_FROM_MESSAGE_CONTENT =
+ "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
+ private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
+ private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+
+
private enum State
{
INITIAL,
@@ -129,10 +164,6 @@
createOrOpenDatabase(databasePath);
-
-
-
-
// this recovers durable queues and persistent messages
recover();
@@ -145,7 +176,7 @@
{
if(DRIVER_CLASS == null)
{
- DRIVER_CLASS = (Class<Driver>) Class.forName(DERBY_DRIVER_NAME);
+ DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
}
}
@@ -163,7 +194,7 @@
createMessageMetaDataTable(conn);
createMessageContentTable(conn);
-
+ conn.close();
}
@@ -174,10 +205,10 @@
{
Statement stmt = conn.createStatement();
- stmt.execute("CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )");
+ stmt.execute(CREATE_DB_VERSION_TABLE);
stmt.close();
- PreparedStatement pstmt = conn.prepareStatement("INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )");
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
pstmt.setInt(1, DB_VERSION);
pstmt.execute();
pstmt.close();
@@ -191,8 +222,8 @@
if(!tableExists(EXCHANGE_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
-
- stmt.execute("CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )");
+
+ stmt.execute(CREATE_EXCHANGE_TABLE);
stmt.close();
}
}
@@ -202,7 +233,7 @@
if(!tableExists(QUEUE_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
- stmt.execute("CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )");
+ stmt.execute(CREATE_QUEUE_TABLE);
stmt.close();
}
}
@@ -212,8 +243,7 @@
if(!tableExists(BINDINGS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
- stmt.execute("CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )");
-
+ stmt.execute(CREATE_BINDINGS_TABLE);
stmt.close();
}
@@ -225,7 +255,7 @@
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
- stmt.execute("CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )");
+ stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
stmt.close();
}
@@ -237,7 +267,7 @@
if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
- stmt.execute("CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )");
+ stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
stmt.close();
}
@@ -250,7 +280,7 @@
if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
- stmt.execute("CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )");
+ stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
stmt.close();
}
@@ -261,7 +291,7 @@
private boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
- PreparedStatement stmt = conn.prepareStatement("SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?");
+ PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
stmt.setString(1, tableName);
ResultSet rs = stmt.executeQuery();
boolean exists = rs.next();
@@ -283,8 +313,6 @@
recoverExchanges();
-//
-
try
{
@@ -317,7 +345,7 @@
Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT name, owner FROM " + QUEUE_TABLE_NAME);
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
while(rs.next())
{
@@ -353,7 +381,7 @@
Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME);
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
Exchange exchange;
while(rs.next())
@@ -391,7 +419,7 @@
{
conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement("SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?");
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
stmt.setString(1, exchange.getName().toString());
ResultSet rs = stmt.executeQuery();
@@ -425,6 +453,7 @@
}
queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
+
}
}
}
@@ -439,9 +468,7 @@
public void close() throws Exception
{
-
_closed.getAndSet(true);
-
}
public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
@@ -462,12 +489,11 @@
MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
try
{
- PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?");
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
wrapper.setRequiresCommit();
int results = stmt.executeUpdate();
-
if (results == 0)
{
if (localTx)
@@ -484,8 +510,7 @@
_logger.debug("Deleted metadata for message " + messageId);
}
-
- stmt = conn.prepareStatement("DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?");
+ stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
results = stmt.executeUpdate();
@@ -528,7 +553,7 @@
{
conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )");
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
stmt.setString(1, exchange.getName().toString());
stmt.setString(2, exchange.getType().toString());
stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
@@ -542,7 +567,6 @@
if(conn != null)
{
conn.close();
-
}
}
}
@@ -561,7 +585,7 @@
try
{
conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?");
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
stmt.setString(1, exchange.getName().toString());
int results = stmt.executeUpdate();
if(results == 0)
@@ -606,16 +630,20 @@
try
{
conn = newConnection();
- // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
- PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )");
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
stmt.setString(1, exchange.getName().toString() );
stmt.setString(2, queue.getName().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
if(args != null)
{
+ /* This would be the Java 6 way of setting a Blob
Blob blobArgs = conn.createBlob();
blobArgs.setBytes(0, args.getDataAsBytes());
stmt.setBlob(4, blobArgs);
+ */
+ byte[] bytes = args.getDataAsBytes();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ stmt.setBinaryStream(4, bis, bytes.length);
}
else
{
@@ -662,7 +690,7 @@
{
conn = newConnection();
// exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
- PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?");
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
stmt.setString(1, exchange.getName().toString() );
stmt.setString(2, queue.getName().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
@@ -711,7 +739,7 @@
Connection conn = newConnection();
PreparedStatement stmt =
- conn.prepareStatement("INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)");
+ conn.prepareStatement(INSERT_INTO_QUEUE);
stmt.setString(1, queue.getName().toString());
stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
@@ -733,12 +761,12 @@
private Connection newConnection() throws SQLException
{
- return DriverManager.getConnection(_connectionURL);
+ final Connection connection = DriverManager.getConnection(_connectionURL);
+ return connection;
}
public void removeQueue(final AMQQueue queue) throws AMQException
{
-
AMQShortString name = queue.getName();
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
Connection conn = null;
@@ -747,7 +775,7 @@
try
{
conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?");
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
stmt.setString(1, name.toString());
int results = stmt.executeUpdate();
@@ -785,15 +813,15 @@
public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
+ AMQShortString name = queue.getName();
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
- AMQShortString name = queue.getName();
try
{
- PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)");
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
stmt.setString(1,name.toString());
stmt.setLong(2,messageId);
stmt.executeUpdate();
@@ -826,15 +854,15 @@
public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
-
AMQShortString name = queue.getName();
+
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
try
{
- PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?");
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
stmt.setString(1,name.toString());
stmt.setLong(2,messageId);
int results = stmt.executeUpdate();
@@ -931,17 +959,18 @@
try
{
+ Connection conn = connWrapper.getConnection();
if(connWrapper.requiresCommit())
{
- Connection conn = connWrapper.getConnection();
conn.commit();
if (_logger.isDebugEnabled())
{
_logger.debug("commit tran completed");
}
- conn.close();
+
}
+ conn.close();
}
catch (SQLException e)
{
@@ -1002,21 +1031,25 @@
int index,
ContentChunk contentBody,
boolean lastContentBody) throws AMQException
- {
+ {
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
try
{
- PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)");
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
stmt.setInt(2, index);
byte[] chunkData = new byte[contentBody.getSize()];
contentBody.getData().duplicate().get(chunkData);
+ /* this would be the Java 6 way of doing things
Blob dataAsBlob = conn.createBlob();
dataAsBlob.setBytes(1L, chunkData);
stmt.setBlob(3, dataAsBlob);
+ */
+ ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+ stmt.setBinaryStream(3, bis, chunkData.length);
stmt.executeUpdate();
connWrapper.requiresCommit();
@@ -1048,7 +1081,7 @@
try
{
- PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)");
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
@@ -1060,9 +1093,13 @@
byte[] underlying = new byte[bodySize];
ByteBuffer buf = ByteBuffer.wrap(underlying);
headerBody.writePayload(buf);
+/*
Blob dataAsBlob = conn.createBlob();
dataAsBlob.setBytes(1L, underlying);
stmt.setBlob(6, dataAsBlob);
+*/
+ ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+ stmt.setBinaryStream(6,bis,underlying.length);
stmt.setInt(7, mmd.getContentChunkCount());
@@ -1096,7 +1133,7 @@
try
{
- PreparedStatement stmt = conn.prepareStatement("SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?");
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
ResultSet rs = stmt.executeQuery();
@@ -1181,7 +1218,7 @@
try
{
- PreparedStatement stmt = conn.prepareStatement("SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?");
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
stmt.setInt(2, index);
ResultSet rs = stmt.executeQuery();
@@ -1269,6 +1306,7 @@
public void process() throws AMQException
{
_queue.enqueue(_context, _message);
+
}
}
@@ -1303,7 +1341,7 @@
TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME);
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
while (rs.next())
@@ -1318,6 +1356,7 @@
if (queue == null)
{
queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
+
_virtualHost.getQueueRegistry().registerQueue(queue);
queues.put(queueName, queue);
}