You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/16 14:21:48 UTC

svn commit: r765593 - /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java

Author: ritchiem
Date: Thu Apr 16 12:21:48 2009
New Revision: 765593

URL: http://svn.apache.org/viewvc?rev=765593&view=rev
Log:
QPID-1814 : Relax Derby so that it does not error if you create an existing Exchange or Queue

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=765593&r1=765592&r2=765593&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Thu Apr 16 12:21:48 2009
@@ -104,9 +104,12 @@
     private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
     private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
     private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
+    private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
     private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
     private static final String SELECT_FROM_BINDINGS =
             "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
+    private static final String FIND_BINDING =
+            "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
     private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
     private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
     private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
@@ -631,29 +634,41 @@
             try
             {
                 conn = newConnection();
-                PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
+
+                PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
                 stmt.setString(1, exchange.getName().toString() );
                 stmt.setString(2, queue.getName().toString());
                 stmt.setString(3, routingKey == null ? null : routingKey.toString());
-                if(args != null)
-                {
-                    /* This would be the Java 6 way of setting a Blob
-                    Blob blobArgs = conn.createBlob();
-                    blobArgs.setBytes(0, args.getDataAsBytes());
-                    stmt.setBlob(4, blobArgs);
-                    */
-                    byte[] bytes = args.getDataAsBytes();
-                    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-                    stmt.setBinaryStream(4, bis, bytes.length);
-                }
-                else
+
+                ResultSet rs = stmt.executeQuery();
+
+                // If this binding is not already in the store then create it.
+                if (!rs.next())
                 {
-                    stmt.setNull(4, Types.BLOB);
-                }
+                    stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
+                    stmt.setString(1, exchange.getName().toString() );
+                    stmt.setString(2, queue.getName().toString());
+                    stmt.setString(3, routingKey == null ? null : routingKey.toString());
+                    if(args != null)
+                    {
+                        /* This would be the Java 6 way of setting a Blob
+                        Blob blobArgs = conn.createBlob();
+                        blobArgs.setBytes(0, args.getDataAsBytes());
+                        stmt.setBlob(4, blobArgs);
+                        */
+                        byte[] bytes = args.getDataAsBytes();
+                        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+                        stmt.setBinaryStream(4, bis, bytes.length);
+                    }
+                    else
+                    {
+                        stmt.setNull(4, Types.BLOB);
+                    }
 
-                stmt.executeUpdate();
-                conn.commit();
-                stmt.close();
+                    stmt.executeUpdate();
+                    conn.commit();
+                    stmt.close();
+                }
             }
             catch (SQLException e)
             {
@@ -744,19 +759,27 @@
             {
                 Connection conn = newConnection();
 
-                PreparedStatement stmt =
-                        conn.prepareStatement(INSERT_INTO_QUEUE);
-
+                PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
                 stmt.setString(1, queue.getName().toString());
-                stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
 
-                stmt.execute();
+                ResultSet rs = stmt.executeQuery();
 
-                stmt.close();
+                // If we don't have any data in the result set then we can add this queue
+                if (!rs.next())
+                {
+                    stmt = conn.prepareStatement(INSERT_INTO_QUEUE);
 
-                conn.commit();
+                    stmt.setString(1, queue.getName().toString());
+                    stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
 
-                conn.close();
+                    stmt.execute();
+
+                    stmt.close();
+
+                    conn.commit();
+
+                    conn.close();
+                }
             }
             catch (SQLException e)
             {
@@ -889,7 +912,7 @@
 
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
+                _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]");
             }
         }
         catch (SQLException e)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org