You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/16 14:21:48 UTC
svn commit: r765593 -
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
Author: ritchiem
Date: Thu Apr 16 12:21:48 2009
New Revision: 765593
URL: http://svn.apache.org/viewvc?rev=765593&view=rev
Log:
QPID-1814 : Relax Derby so that it does not error if you create an existing Exchange or Queue
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=765593&r1=765592&r2=765593&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Thu Apr 16 12:21:48 2009
@@ -104,9 +104,12 @@
private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
+ private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
private static final String SELECT_FROM_BINDINGS =
"SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
+ private static final String FIND_BINDING =
+ "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
@@ -631,29 +634,41 @@
try
{
conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
stmt.setString(1, exchange.getName().toString() );
stmt.setString(2, queue.getName().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
- if(args != null)
- {
- /* This would be the Java 6 way of setting a Blob
- Blob blobArgs = conn.createBlob();
- blobArgs.setBytes(0, args.getDataAsBytes());
- stmt.setBlob(4, blobArgs);
- */
- byte[] bytes = args.getDataAsBytes();
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- stmt.setBinaryStream(4, bis, bytes.length);
- }
- else
+
+ ResultSet rs = stmt.executeQuery();
+
+ // If this binding is not already in the store then create it.
+ if (!rs.next())
{
- stmt.setNull(4, Types.BLOB);
- }
+ stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
+ stmt.setString(1, exchange.getName().toString() );
+ stmt.setString(2, queue.getName().toString());
+ stmt.setString(3, routingKey == null ? null : routingKey.toString());
+ if(args != null)
+ {
+ /* This would be the Java 6 way of setting a Blob
+ Blob blobArgs = conn.createBlob();
+ blobArgs.setBytes(0, args.getDataAsBytes());
+ stmt.setBlob(4, blobArgs);
+ */
+ byte[] bytes = args.getDataAsBytes();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ stmt.setBinaryStream(4, bis, bytes.length);
+ }
+ else
+ {
+ stmt.setNull(4, Types.BLOB);
+ }
- stmt.executeUpdate();
- conn.commit();
- stmt.close();
+ stmt.executeUpdate();
+ conn.commit();
+ stmt.close();
+ }
}
catch (SQLException e)
{
@@ -744,19 +759,27 @@
{
Connection conn = newConnection();
- PreparedStatement stmt =
- conn.prepareStatement(INSERT_INTO_QUEUE);
-
+ PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
stmt.setString(1, queue.getName().toString());
- stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
- stmt.execute();
+ ResultSet rs = stmt.executeQuery();
- stmt.close();
+ // If we don't have any data in the result set then we can add this queue
+ if (!rs.next())
+ {
+ stmt = conn.prepareStatement(INSERT_INTO_QUEUE);
- conn.commit();
+ stmt.setString(1, queue.getName().toString());
+ stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
- conn.close();
+ stmt.execute();
+
+ stmt.close();
+
+ conn.commit();
+
+ conn.close();
+ }
}
catch (SQLException e)
{
@@ -889,7 +912,7 @@
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
+ _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]");
}
}
catch (SQLException e)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org