You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/08/30 14:19:42 UTC

svn commit: r571129 [3/15] - in /incubator/qpid/trunk/qpid/java: ./ broker/ broker/bin/ broker/distribution/src/main/assembly/ broker/etc/ broker/src/main/java/org/apache/log4j/ broker/src/main/java/org/apache/qpid/configuration/ broker/src/main/java/o...

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java Thu Aug 30 05:19:31 2007
@@ -17,29 +17,50 @@
  */
 package org.apache.qpid.server.messageStore;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.InvalidXidException;
+import org.apache.qpid.server.exception.MessageAlreadyStagedException;
+import org.apache.qpid.server.exception.MessageDoesntExistException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
+import org.apache.qpid.server.exception.QueueDoesntExistException;
+import org.apache.qpid.server.exception.UnknownXidException;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.*;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.server.txn.JDBCAbstractRecord;
+import org.apache.qpid.server.txn.JDBCDequeueRecord;
+import org.apache.qpid.server.txn.JDBCEnqueueRecord;
+import org.apache.qpid.server.txn.JDBCTransaction;
+import org.apache.qpid.server.txn.JDBCTransactionManager;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.TransactionRecord;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.XidImpl;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import javax.transaction.xa.Xid;
-import java.util.Collection;
-import java.util.List;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.sql.*;
+import java.util.List;
 
 /**
  * Created by Arnaud Simon
@@ -173,16 +194,18 @@
                 if (pstmt == null)
                 {
                     pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchange +
-                            " (Name,Type) VALUES (?,?)");
+                                                                        " (Name,Type) VALUES (?,?)");
                     connection.getStatements()[CREATE_EXCHANGE] = pstmt;
                 }
                 pstmt.setString(1, exchange.getName().asString());
                 pstmt.setString(2, exchange.getType().asString());
                 pstmt.executeUpdate();
-            } catch (Exception e)
+            }
+            catch (Exception e)
             {
                 throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
-            } finally
+            }
+            finally
             {
                 if (connection != null)
                 {
@@ -190,7 +213,8 @@
                     {
                         connection.getConnection().commit();
                         _connectionPool.releaseInstance(connection);
-                    } catch (SQLException e)
+                    }
+                    catch (SQLException e)
                     {
                         // we did not manage to commit this connection
                         // it is better to release it
@@ -216,15 +240,17 @@
                 if (pstmt == null)
                 {
                     pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchange +
-                            " WHERE Name = ?");
+                                                                        " WHERE Name = ?");
                     connection.getStatements()[DELETE_EXCHANGE] = pstmt;
                 }
                 pstmt.setString(1, exchange.getName().asString());
                 pstmt.executeUpdate();
-            } catch (Exception e)
+            }
+            catch (Exception e)
             {
                 throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
-            } finally
+            }
+            finally
             {
                 if (connection != null)
                 {
@@ -232,7 +258,8 @@
                     {
                         connection.getConnection().commit();
                         _connectionPool.releaseInstance(connection);
-                    } catch (SQLException e)
+                    }
+                    catch (SQLException e)
                     {
                         // we did not manage to commit this connection
                         // it is better to release it
@@ -258,7 +285,7 @@
                 if (pstmt == null)
                 {
                     pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchangeQueueRelation +
-                            " (QueueID,Name,RoutingKey,fieldTable) VALUES (?,?,?,?)");
+                                                                        " (QueueID,Name,RoutingKey,fieldTable) VALUES (?,?,?,?)");
                     connection.getStatements()[BIND_QUEUE] = pstmt;
                 }
                 pstmt.setInt(1, queue.getQueueID());
@@ -267,15 +294,18 @@
                 if (args != null)
                 {
                     pstmt.setBytes(4, args.getDataAsBytes());
-                } else
+                }
+                else
                 {
                     pstmt.setBytes(4, null);
                 }
                 pstmt.executeUpdate();
-            } catch (Exception e)
+            }
+            catch (Exception e)
             {
                 throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
-            } finally
+            }
+            finally
             {
                 if (connection != null)
                 {
@@ -283,7 +313,8 @@
                     {
                         connection.getConnection().commit();
                         _connectionPool.releaseInstance(connection);
-                    } catch (SQLException e)
+                    }
+                    catch (SQLException e)
                     {
                         // we did not manage to commit this connection
                         // it is better to release it
@@ -307,17 +338,19 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchangeQueueRelation +
-                        " WHERE QueueID = ? AND NAME = ? AND RoutingKey = ?");
+                                                                    " WHERE QueueID = ? AND NAME = ? AND RoutingKey = ?");
                 connection.getStatements()[UNBIND_QUEUE] = pstmt;
             }
             pstmt.setInt(1, queue.getQueueID());
             pstmt.setString(2, exchange.getName().asString());
             pstmt.setString(3, routingKey.asString());
             pstmt.executeUpdate();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -325,7 +358,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -349,7 +383,7 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueue +
-                        " (QueueID,Name,Owner) VALUES (?,?,?)");
+                                                                    " (QueueID,Name,Owner) VALUES (?,?,?)");
                 connection.getStatements()[CREATE_QUEUE] = pstmt;
             }
             pstmt.setInt(1, queue.getQueueID());
@@ -357,15 +391,18 @@
             if (queue.getOwner() != null)
             {
                 pstmt.setString(3, queue.getOwner().asString());
-            } else
+            }
+            else
             {
                 pstmt.setString(3, null);
             }
             pstmt.executeUpdate();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot create Queue: " + queue, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -373,7 +410,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -397,15 +435,17 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueue +
-                        " WHERE QueueID = ?");
+                                                                    " WHERE QueueID = ?");
                 connection.getStatements()[DELETE_QUEUE] = pstmt;
             }
             pstmt.setInt(1, queue.getQueueID());
             pstmt.executeUpdate();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot remove Queue: " + queue, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -413,7 +453,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -439,10 +480,12 @@
         {
             connection = (MyConnection) _connectionPool.acquireInstance();
             stage(connection, m);
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot stage Message: " + m, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -450,7 +493,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -470,19 +514,21 @@
         if (!m.isStaged())
         {
             _log.error("Cannot append content of message Id "
-                    + m.getMessageId() + " as it has not been staged");
+                       + m.getMessageId() + " as it has not been staged");
             throw new MessageDoesntExistException("Cannot append content of message Id "
-                    + m.getMessageId() + " as it has not been staged");
+                                                  + m.getMessageId() + " as it has not been staged");
         }
         MyConnection connection = null;
         try
         {
             connection = (MyConnection) _connectionPool.acquireInstance();
             appendContent(connection, m, data, offset, size);
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot stage Message: " + m, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -490,7 +536,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -515,7 +562,7 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
-                        " WHERE MessageID = ? ");
+                                                                    " WHERE MessageID = ? ");
                 connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
             }
             pstmt.setLong(1, m.getMessageId());
@@ -523,7 +570,7 @@
             if (!rs.next())
             {
                 throw new MessageDoesntExistException("Cannot load content of message Id "
-                        + m.getMessageId() + " as it has not been found");
+                                                      + m.getMessageId() + " as it has not been found");
             }
             Blob myBlob = rs.getBlob(1);
 
@@ -532,21 +579,25 @@
                 if (size == 0)
                 {
                     result = myBlob.getBytes(offset, (int) myBlob.length());
-                } else
+                }
+                else
                 {
                     result = myBlob.getBytes(offset, size);
                 }
-            } else
+            }
+            else
             {
                 throw new MessageDoesntExistException("Cannot load content of message Id "
-                        + m.getMessageId() + " as it has not been found");
+                                                      + m.getMessageId() + " as it has not been found");
             }
             rs.close();
             return result;
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot load Message: " + m, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -554,7 +605,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -575,10 +627,12 @@
         {
             connection = (MyConnection) _connectionPool.acquireInstance();
             destroy(connection, m);
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot destroy message: " + m, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -586,7 +640,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -613,14 +668,16 @@
         {
             // add an enqueue record
             tx.addRecord(new JDBCEnqueueRecord(m, queue));
-        } else
+        }
+        else
         {
             try
             {
                 if (tx != null)
                 {
                     connection = tx.getConnection();
-                } else
+                }
+                else
                 {
                     connection = (MyConnection) _connectionPool.acquireInstance();
                 }
@@ -634,7 +691,7 @@
                 if (pstmt == null)
                 {
                     pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueueMessageRelation +
-                            " (QueueID,MessageID,Prepared) VALUES (?,?,0)");
+                                                                        " (QueueID,MessageID,Prepared) VALUES (?,?,0)");
                     connection.getStatements()[ENQUEUE] = pstmt;
                 }
                 pstmt.setInt(1, queue.getQueueID());
@@ -642,10 +699,12 @@
                 pstmt.executeUpdate();
                 m.enqueue(queue);
                 queue.enqueue(m);
-            } catch (Exception e)
+            }
+            catch (Exception e)
             {
                 throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
-            } finally
+            }
+            finally
             {
                 if (tx == null && connection != null)
                 {
@@ -653,7 +712,8 @@
                     {
                         connection.getConnection().commit();
                         _connectionPool.releaseInstance(connection);
-                    } catch (SQLException e)
+                    }
+                    catch (SQLException e)
                     {
                         // we did not manage to commit this connection
                         // it is better to release it
@@ -680,14 +740,16 @@
         {
             // add an dequeue record
             tx.addRecord(new JDBCDequeueRecord(m, queue));
-        } else
+        }
+        else
         {
             try
             {
                 if (tx != null)
                 {
                     connection = tx.getConnection();
-                } else
+                }
+                else
                 {
                     connection = (MyConnection) _connectionPool.acquireInstance();
                 }
@@ -695,7 +757,7 @@
                 if (pstmt == null)
                 {
                     pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueueMessageRelation +
-                            " WHERE QueueID = ? AND MessageID = ?");
+                                                                        " WHERE QueueID = ? AND MessageID = ?");
                     connection.getStatements()[DEQUEUE] = pstmt;
                 }
                 pstmt.setInt(1, queue.getQueueID());
@@ -708,10 +770,12 @@
                     destroy(connection, m);
                 }
                 queue.dequeue(m);
-            } catch (Exception e)
+            }
+            catch (Exception e)
             {
                 throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
-            } finally
+            }
+            finally
             {
                 if (tx == null && connection != null)
                 {
@@ -719,7 +783,8 @@
                     {
                         connection.getConnection().commit();
                         _connectionPool.releaseInstance(connection);
-                    } catch (SQLException e)
+                    }
+                    catch (SQLException e)
                     {
                         // we did not manage to commit this connection
                         // it is better to release it
@@ -756,14 +821,16 @@
                     queueOwner = new AMQShortString(rs.getString(3));
                 }
                 result.add(new AMQQueue(new AMQShortString(rs.getString(2)), true, queueOwner,
-                        false, _virtualHost));
+                                        false, _virtualHost));
             }
             rs.close();
             return result;
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot get all queues", e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -771,7 +838,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -791,10 +859,12 @@
         {
             connection = (MyConnection) _connectionPool.acquireInstance();
             return getAllMessages(connection, queue);
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot get all queues", e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -802,7 +872,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -821,7 +892,7 @@
         HashMap<Xid, Transaction> result = new HashMap<Xid, Transaction>();
         try
         {
-            TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+            //TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
             MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
             // re-create all the tx
             connection = (MyConnection) _connectionPool.acquireInstance();
@@ -840,11 +911,11 @@
                 }
                 foundTx = new JDBCTransaction();
                 foundXid = new XidImpl(rs.getBlob(3).getBytes(1, (int) rs.getBlob(3).length()),
-                        rs.getInt(2), rs.getBlob(4).getBytes(1, (int) rs.getBlob(4).length()));
+                                       rs.getInt(2), rs.getBlob(4).getBytes(1, (int) rs.getBlob(4).length()));
                 // get all the records
                 Statement stmtr = connection.getConnection().createStatement();
                 ResultSet rsr = stmtr.executeQuery("SELECT * FROM " + _tableNameRecord +
-                        " WHERE XID_ID = " + rs.getLong(1));
+                                                   " WHERE XID_ID = " + rs.getLong(1));
                 int foundType;
                 AMQQueue foundQueue;
                 StorableMessage foundMessage;
@@ -854,11 +925,14 @@
                     // those messages were not recovered before so they need to be recreated
                     foundType = rsr.getInt(2);
                     foundQueue = _queueMap.get(new Integer(rsr.getInt(4)));
-                    foundMessage = new AMQMessage(rs.getLong(3), this, messageHandleFactory, txnContext);
+
+                    //DTX MessageStore - this -> null , txContext -> null
+                    foundMessage = new AMQMessage(rs.getLong(3), null, messageHandleFactory, null);
                     if (foundType == JDBCAbstractRecord.TYPE_DEQUEUE)
                     {
                         foundRecord = new JDBCDequeueRecord(foundMessage, foundQueue);
-                    } else
+                    }
+                    else
                     {
                         foundRecord = new JDBCEnqueueRecord(foundMessage, foundQueue);
                     }
@@ -870,10 +944,12 @@
             }
             rs.close();
             return result;
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot recover: ", e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -881,7 +957,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -917,7 +994,8 @@
         {
             connection.getConnection().commit();
             _connectionPool.releaseInstance(connection);
-        } catch (SQLException e)
+        }
+        catch (SQLException e)
         {
             // we did not manage to commit this connection
             // it is better to release it
@@ -934,7 +1012,8 @@
         {
             connection.getConnection().rollback();
             _connectionPool.releaseInstance(connection);
-        } catch (SQLException e)
+        }
+        catch (SQLException e)
         {
             // we did not manage to rollback this connection
             // it is better to release it
@@ -952,7 +1031,7 @@
         if (pstmt == null)
         {
             pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
-                    " WHERE MessageID = ? ");
+                                                                " WHERE MessageID = ? ");
             connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
         }
         pstmt.setLong(1, m.getMessageId());
@@ -960,14 +1039,15 @@
         if (!rs.next())
         {
             throw new MessageDoesntExistException("Cannot append content of message Id "
-                    + m.getMessageId() + " as it has not been found");
+                                                  + m.getMessageId() + " as it has not been found");
         }
         Blob myBlob = rs.getBlob(1);
         byte[] oldPayload;
         if (myBlob != null && myBlob.length() > 0)
         {
             oldPayload = myBlob.getBytes(1, (int) myBlob.length());
-        } else
+        }
+        else
         {
             oldPayload = new byte[0];
         }
@@ -980,7 +1060,7 @@
         if (pstmtUpdate == null)
         {
             pstmtUpdate = connection.getConnection().prepareStatement("UPDATE " + _tableNameMessage +
-                    " SET Payload = ? WHERE MessageID = ?");
+                                                                      " SET Payload = ? WHERE MessageID = ?");
             connection.getStatements()[UPDATE_MESSAGE_PAYLOAD] = pstmtUpdate;
         }
         pstmtUpdate.setBytes(1, newPayload);
@@ -996,7 +1076,7 @@
         if (pstmt == null)
         {
             pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameMessage +
-                    " (MessageID,Header,ExchangeName,RoutingKey,Mandatory,Is_Immediate) VALUES (?,?,?,?,?,?)");
+                                                                " (MessageID,Header,ExchangeName,RoutingKey,Mandatory,Is_Immediate) VALUES (?,?,?,?,?,?)");
             connection.getStatements()[STAGE_MESSAGE] = pstmt;
         }
         pstmt.setLong(1, m.getMessageId());
@@ -1019,7 +1099,7 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameRecord +
-                        " (XID_ID,Type,MessageID,QueueID) VALUES (?,?,?,?)");
+                                                                    " (XID_ID,Type,MessageID,QueueID) VALUES (?,?,?,?)");
                 connection.getStatements()[SAVE_RECORD] = pstmt;
             }
             pstmt.setLong(1, tx.getXidID());
@@ -1027,7 +1107,8 @@
             pstmt.setLong(3, record.getMessageID());
             pstmt.setLong(4, record.getQueueID());
             pstmt.executeUpdate();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot save record: " + record, e);
         }
@@ -1043,7 +1124,7 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameTransaction +
-                        " (XID_ID,FormatId, BranchQualifier,GlobalTransactionId) VALUES (?,?,?,?)");
+                                                                    " (XID_ID,FormatId, BranchQualifier,GlobalTransactionId) VALUES (?,?,?,?)");
                 connection.getStatements()[SAVE_XID] = pstmt;
             }
             pstmt.setLong(1, tx.getXidID());
@@ -1051,7 +1132,8 @@
             pstmt.setBytes(3, xid.getBranchQualifier());
             pstmt.setBytes(4, xid.getGlobalTransactionId());
             pstmt.executeUpdate();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot save xid: " + xid, e);
         }
@@ -1067,12 +1149,13 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameRecord +
-                        " WHERE XID_ID = ?");
+                                                                    " WHERE XID_ID = ?");
                 connection.getStatements()[DELETE_RECORD] = pstmt;
             }
             pstmt.setLong(1, tx.getXidID());
             pstmt.executeUpdate();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot delete record: " + tx.getXidID(), e);
         }
@@ -1088,12 +1171,13 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameTransaction +
-                        " WHERE XID_ID = ?");
+                                                                    " WHERE XID_ID = ?");
                 connection.getStatements()[DELETE_XID] = pstmt;
             }
             pstmt.setLong(1, tx.getXidID());
             pstmt.executeUpdate();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot delete xid: " + tx.getXidID(), e);
         }
@@ -1142,14 +1226,15 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("UPDATE " + _tableNameQueueMessageRelation +
-                        " SET Prepared = ? WHERE MessageID = ? AND QueueID = ?");
+                                                                    " SET Prepared = ? WHERE MessageID = ? AND QueueID = ?");
                 connection.getStatements()[UPDATE_QMR] = pstmt;
             }
             pstmt.setInt(1, prepared);
             pstmt.setLong(2, messageId);
             pstmt.setInt(3, queueID);
             pstmt.executeUpdate();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot update QMR", e);
         }
@@ -1169,8 +1254,8 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("SELECT ExchangeName, RoutingKey," +
-                        " Mandatory, Is_Immediate from " + _tableNameMessage +
-                        " WHERE MessageID = ?");
+                                                                    " Mandatory, Is_Immediate from " + _tableNameMessage +
+                                                                    " WHERE MessageID = ?");
                 connection.getStatements()[GET_MESSAGE_INFO] = pstmt;
             }
             pstmt.setLong(1, m.getMessageId());
@@ -1204,16 +1289,19 @@
                         return routingKey;
                     }
                 };
-            } else
+            }
+            else
             {
                 throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
             }
             rs.close();
             return result;
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -1221,7 +1309,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -1245,7 +1334,7 @@
             if (pstmt == null)
             {
                 pstmt = connection.getConnection().prepareStatement("SELECT Header from " + _tableNameMessage +
-                        " WHERE MessageID = ?");
+                                                                    " WHERE MessageID = ?");
                 connection.getStatements()[GET_CONTENT_HEADER] = pstmt;
             }
             pstmt.setLong(1, m.getMessageId());
@@ -1253,16 +1342,19 @@
             if (rs.next())
             {
                 result = new ContentHeaderBody(ByteBuffer.wrap(rs.getBlob(1).getBytes(1, (int) rs.getBlob(1).length())), 0);
-            } else
+            }
+            else
             {
                 throw new InternalErrorException("Cannot get Content Header of message: " + m);
             }
             rs.close();
             return result;
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot get Content Header of message: " + m, e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -1270,7 +1362,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -1287,21 +1380,21 @@
             AMQException
     {
         List<StorableMessage> result = new ArrayList<StorableMessage>();
-        TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+//        TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
         MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
         PreparedStatement pstmt = connection.getStatements()[GET_ALL_MESSAGES];
         if (pstmt == null)
         {
             pstmt = connection.getConnection().prepareStatement("SELECT " + _tableNameMessage + ".MessageID, Header FROM " +
-                    _tableNameMessage +
-                    " INNER JOIN " +
-                    _tableNameQueueMessageRelation +
-                    " ON " +
-                    _tableNameMessage + ".MessageID = " + _tableNameQueueMessageRelation + ".MessageID" +
-                    " WHERE " +
-                    _tableNameQueueMessageRelation + ".QueueID = ?" +
-                    " AND " +
-                    _tableNameQueueMessageRelation + ".Prepared = 0");
+                                                                _tableNameMessage +
+                                                                " INNER JOIN " +
+                                                                _tableNameQueueMessageRelation +
+                                                                " ON " +
+                                                                _tableNameMessage + ".MessageID = " + _tableNameQueueMessageRelation + ".MessageID" +
+                                                                " WHERE " +
+                                                                _tableNameQueueMessageRelation + ".QueueID = ?" +
+                                                                " AND " +
+                                                                _tableNameQueueMessageRelation + ".Prepared = 0");
             connection.getStatements()[GET_ALL_MESSAGES] = pstmt;
         }
         pstmt.setInt(1, queue.getQueueID());
@@ -1310,7 +1403,10 @@
         // ContentHeaderBody hb;
         while (rs.next())
         {
-            foundMessage = new AMQMessage(rs.getLong(1), this, messageHandleFactory, txnContext);
+
+            //DTX MessageStore - this -> null , txContext -> null
+            foundMessage = new AMQMessage(rs.getLong(1), null, messageHandleFactory, null);
+            
             result.add(foundMessage);
         }
         rs.close();
@@ -1340,7 +1436,7 @@
                     owner = new AMQShortString(rs.getString(3));
                 }
                 foundQueue = new AMQQueue(new AMQShortString(rs.getString(2)),
-                        true, owner, false, _virtualHost);
+                                          true, owner, false, _virtualHost);
                 // get all the Messages of that queue
                 foundMessages = getAllMessages(connection, foundQueue);
                 // enqueue those messages
@@ -1350,7 +1446,7 @@
                 }
                 for (StorableMessage foundMessage : foundMessages)
                 {
-                    foundMessage.staged();                    
+                    foundMessage.staged();
                     foundMessage.enqueue(foundQueue);
                     foundQueue.enqueue(foundMessage);
                     foundQueue.process(context, (AMQMessage) foundMessage, false);
@@ -1362,10 +1458,12 @@
             }
             rs.close();
             return result;
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot recover: ", e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -1373,7 +1471,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -1404,7 +1503,7 @@
                 // get all the bindings
                 Statement stmtb = connection.getConnection().createStatement();
                 ResultSet rsb = stmtb.executeQuery("SELECT * FROM " + _tableNameExchangeQueueRelation +
-                        " WHERE Name = '" + rs.getString(1) + "'");
+                                                   " WHERE Name = '" + rs.getString(1) + "'");
                 while (rsb.next())
                 {
                     foundQueue = queueMap.get(new Integer(rsb.getInt(1)));
@@ -1426,10 +1525,12 @@
                 _virtualHost.getExchangeRegistry().registerExchange(foundExchange);
             }
             rs.close();
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new InternalErrorException("Cannot recover: ", e);
-        } finally
+        }
+        finally
         {
             if (connection != null)
             {
@@ -1437,7 +1538,8 @@
                 {
                     connection.getConnection().commit();
                     _connectionPool.releaseInstance(connection);
-                } catch (SQLException e)
+                }
+                catch (SQLException e)
                 {
                     // we did not manage to commit this connection
                     // it is better to release it
@@ -1456,7 +1558,7 @@
         if (pstmt == null)
         {
             pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameMessage +
-                    " WHERE MessageID = ?");
+                                                                " WHERE MessageID = ?");
             connection.getStatements()[DELETE_MESSAGE] = pstmt;
         }
         pstmt.setLong(1, m.getMessageId());
@@ -1589,8 +1691,8 @@
             try
             {
                 stmt.executeUpdate("CREATE TABLE " + _tableNameMessage + " (MessageID FLOAT NOT NULL, Header BLOB," +
-                        " Payload BLOB, ExchangeName VARCHAR(1024), RoutingKey VARCHAR(1024)," +
-                        " Mandatory INTEGER, Is_Immediate INTEGER, PRIMARY KEY(MessageID))");
+                                   " Payload BLOB, ExchangeName VARCHAR(1024), RoutingKey VARCHAR(1024)," +
+                                   " Mandatory INTEGER, Is_Immediate INTEGER, PRIMARY KEY(MessageID))");
                 myconnection._connection.commit();
             }
             catch (SQLException ex)
@@ -1602,7 +1704,7 @@
             try
             {
                 stmt.executeUpdate("CREATE TABLE " + _tableNameQueue + " (QueueID INTEGER NOT NULL, " +
-                        "Name VARCHAR(1024) NOT NULL, Owner VARCHAR(1024), PRIMARY KEY(QueueID))");
+                                   "Name VARCHAR(1024) NOT NULL, Owner VARCHAR(1024), PRIMARY KEY(QueueID))");
                 myconnection._connection.commit();
             }
             catch (SQLException ex)
@@ -1614,7 +1716,7 @@
             try
             {
                 stmt.executeUpdate("CREATE TABLE " + _tableNameQueueMessageRelation + " (QueueID INTEGER NOT NULL, " +
-                        "MessageID FLOAT NOT NULL, Prepared INTEGER)");
+                                   "MessageID FLOAT NOT NULL, Prepared INTEGER)");
                 myconnection._connection.commit();
             }
             catch (SQLException ex)
@@ -1625,7 +1727,7 @@
             try
             {
                 stmt.executeUpdate("CREATE TABLE " + _tableNameExchange + " (Name VARCHAR(1024) NOT NULL, " +
-                        "Type VARCHAR(1024) NOT NULL, PRIMARY KEY(Name))");
+                                   "Type VARCHAR(1024) NOT NULL, PRIMARY KEY(Name))");
                 myconnection._connection.commit();
             }
             catch (SQLException ex)
@@ -1636,7 +1738,7 @@
             try
             {
                 stmt.executeUpdate("CREATE TABLE " + _tableNameExchangeQueueRelation + " (QueueID INTEGER NOT NULL, " +
-                        "Name VARCHAR(1024) NOT NULL, RoutingKey VARCHAR(1024), FieldTable BLOB )");
+                                   "Name VARCHAR(1024) NOT NULL, RoutingKey VARCHAR(1024), FieldTable BLOB )");
                 myconnection._connection.commit();
             }
             catch (SQLException ex)
@@ -1647,7 +1749,7 @@
             try
             {
                 stmt.executeUpdate("CREATE TABLE " + _tableNameRecord + " (XID_ID FLOAT, Type INTEGER, MessageID FLOAT, " +
-                        "QueueID INTEGER, PRIMARY KEY(Type, MessageID, QueueID))");
+                                   "QueueID INTEGER, PRIMARY KEY(Type, MessageID, QueueID))");
                 // we could alter the table with QueueID as foreign key
                 myconnection._connection.commit();
             }
@@ -1659,7 +1761,7 @@
             try
             {
                 stmt.executeUpdate("CREATE TABLE " + _tableNameTransaction + " (XID_ID FLOAT, FormatId INTEGER, " +
-                        "BranchQualifier BLOB, GlobalTransactionId BLOB, PRIMARY KEY(XID_ID))");
+                                   "BranchQualifier BLOB, GlobalTransactionId BLOB, PRIMARY KEY(XID_ID))");
                 myconnection._connection.commit();
             }
             catch (SQLException ex)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java Thu Aug 30 05:19:31 2007
@@ -18,15 +18,23 @@
  */
 package org.apache.qpid.server.messageStore;
 
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.InvalidXidException;
+import org.apache.qpid.server.exception.MessageAlreadyStagedException;
+import org.apache.qpid.server.exception.MessageDoesntExistException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
+import org.apache.qpid.server.exception.QueueDoesntExistException;
+import org.apache.qpid.server.exception.UnknownXidException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import javax.transaction.xa.Xid;
 import java.util.Collection;
@@ -203,7 +211,7 @@
      * @throws InternalErrorException      In case of internal message store problem
      * @throws MessageDoesntExistException If the message does not exist
      */
-      public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+    public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
             throws
             InternalErrorException,
             MessageDoesntExistException;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Thu Aug 30 05:19:31 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,6 +44,9 @@
 import org.apache.qpid.server.transport.ConnectorConfiguration;
 import org.apache.qpid.ssl.SSLContextFactory;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
 /**
  * The protocol handler handles "protocol events" for all connections. The state
  * associated with an individual connection is accessed through the protocol session.
@@ -80,12 +83,12 @@
         final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
 
         createSession(protocolSession, _applicationRegistry, codecFactory);
-        _logger.info("Protocol session created");
+        _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
 
         final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
 
-        ConnectorConfiguration connectorConfig =
-            ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class);
+        ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
+                getConfiguredObject(ConnectorConfiguration.class);
         if (connectorConfig.enableExecutorPool)
         {
             if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession))
@@ -95,7 +98,7 @@
                 String certType = connectorConfig.certType;
                 SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
                 protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
-                    new SSLFilter(sslContextFactory.buildServerContext()));
+                                                          new SSLFilter(sslContextFactory.buildServerContext()));
             }
 
             protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
@@ -119,22 +122,21 @@
     /**
      * Separated into its own, protected, method to allow easier reuse
      */
-    protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec)
-        throws AMQException
+    protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
     {
         new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
     }
 
     public void sessionOpened(IoSession protocolSession) throws Exception
     {
-        _logger.info("Session opened");
+        _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
     }
 
     public void sessionClosed(IoSession protocolSession) throws Exception
     {
-        _logger.info("Protocol Session closed");
+        _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
         final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
-        // fixme  -- this can be null
+        //fixme  -- this can be null
         if (amqProtocolSession != null)
         {
             amqProtocolSession.closeSession();
@@ -143,15 +145,15 @@
 
     public void sessionIdle(IoSession session, IdleStatus status) throws Exception
     {
-        _logger.debug("Protocol Session [" + this + "] idle: " + status);
+        _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress());
         if (IdleStatus.WRITER_IDLE.equals(status))
         {
-            // write heartbeat frame:
+            //write heartbeat frame:
             session.write(HeartbeatBody.FRAME);
         }
         else if (IdleStatus.READER_IDLE.equals(status))
         {
-            // failover:
+            //failover:
             throw new IOException("Timed out while waiting for heartbeat from peer.");
         }
 
@@ -167,7 +169,7 @@
 
             protocolSession.close();
 
-            _logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable);
+            _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable);
         }
         else if (throwable instanceof IOException)
         {
@@ -178,13 +180,14 @@
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
 
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.write(ConnectionCloseBody.createAMQFrame(0, session.getProtocolMajorVersion(),
-                    session.getProtocolMinorVersion(), // AMQP version (major, minor)
-                    0, // classId
-                    0, // methodId
-                    200, // replyCode
-                    new AMQShortString(throwable.getMessage()) // replyText
-                ));
+            protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+                                                                     session.getProtocolMajorVersion(),
+                                                                     session.getProtocolMinorVersion(),    // AMQP version (major, minor)
+                                                                     0,    // classId
+                                                                     0,    // methodId
+                                                                     200,    // replyCode
+                                                                     new AMQShortString(throwable.getMessage())    // replyText
+            ));
             protocolSession.close();
         }
     }
@@ -203,6 +206,7 @@
         if (message instanceof AMQDataBlock)
         {
             amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
+                        
         }
         else if (message instanceof ByteBuffer)
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Aug 30 05:19:31 2007
@@ -20,17 +20,8 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-/** Combines the information that make up a deliverable message into a more manageable form. */
-
 import org.apache.log4j.Logger;
-
 import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
@@ -40,22 +31,33 @@
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.messageStore.StorableMessage;
 import org.apache.qpid.server.messageStore.StorableQueue;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Combines the information that make up a deliverable message into a more manageable form.
  */
 public class AMQMessage implements StorableMessage
 {
+    /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
-    // The ordered list of queues into which this message is enqueued.
+    // The ordered list of queues into which this message is enqueued.    
     private List<StorableQueue> _queues = new LinkedList<StorableQueue>();
     // Indicates whether this message is staged
     private boolean _isStaged = false;
@@ -66,7 +68,7 @@
     private Set<Object> _tokens;
 
     /**
-     * Only use in clustering - should ideally be removed?
+     * Only use in clustering - //todo: should ideally be removed?
      */
     private AMQProtocolSession _publisher;
 
@@ -76,12 +78,13 @@
 
     private AMQMessageHandle _messageHandle;
 
+    /** Holds the transactional context in which this message is being processed. */
     // TODO: ideally this should be able to go into the transient message date - check this! (RG)
     private TransactionalContext _txnContext;
 
     /**
-     * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
-     * messages published with the 'immediate' flag.
+     * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+     * for messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
     /**
@@ -89,18 +92,25 @@
      * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
      * removed from the store.
      */
+
+    /** Flag to indicate that this message requires 'immediate' delivery. */
     private boolean _immediate;
 
     // private Subscription _takenBySubcription;
     // private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
+    //todo: this should be part of a messageOnQueue object
     private Set<Subscription> _rejectedBy = null;
 
+    //todo: this should be part of a messageOnQueue object
     private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
+    //todo: this should be part of a messageOnQueue object
     private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
 
     private final int hashcode = System.identityHashCode(this);
+
+    //todo: this should be part of a messageOnQueue object
     private long _expiration;
 
     public String debugIdentity()
@@ -111,9 +121,9 @@
     public void setExpiration()
     {
         long expiration =
-            ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+                ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
         long timestamp =
-            ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+                ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
 
         if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
         {
@@ -176,8 +186,8 @@
             {
 
                 AMQBody cb =
-                    getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
-                            _messageId, ++_index));
+                        getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+                                                                                                         _messageId, ++_index));
 
                 return new AMQFrame(_channel, cb);
             }
@@ -259,10 +269,11 @@
      * @param messageId
      * @param store
      * @param factory
+     *
      * @throws AMQException
      */
     public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
-        throws AMQException
+            throws AMQException
     {
         _messageId = messageId;
         _messageHandle = factory.createMessageHandle(store, this, true);
@@ -279,7 +290,7 @@
      * @param contentHeader
      */
     public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
-        ContentHeaderBody contentHeader) throws AMQException
+                      ContentHeaderBody contentHeader) throws AMQException
     {
         this(messageId, info, txnContext);
         setContentHeaderBody(contentHeader);
@@ -294,11 +305,12 @@
      * @param contentHeader
      * @param destinationQueues
      * @param contentBodies
+     *
      * @throws AMQException
      */
     public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
-        ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
-        MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
+                      ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+                      MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
     {
         this(messageId, info, txnContext, contentHeader);
         _transientMessageData.setDestinationQueues(destinationQueues);
@@ -443,22 +455,23 @@
     }
 
     public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
-        throws AMQException
+            throws AMQException
     {
         final boolean persistent = isPersistent();
         _messageHandle = factory.createMessageHandle(store, this, persistent);
-        // if (persistent)
-        // {
-        _txnContext.beginTranIfNecessary();
-        // }
+        if (persistent)  //DTX was removed
+        {
+            _txnContext.beginTranIfNecessary();
+        }
 
         // enqueuing the messages ensure that if required the destinations are recorded to a
         // persistent store
 
-        // for (AMQQueue q : _transientMessageData.getDestinationQueues())
-        // {
-        // _messageHandle.enqueue(storeContext, _messageId, q);
-        // }
+        //DTX was removed
+        for (AMQQueue q : _transientMessageData.getDestinationQueues())
+        {
+            _messageHandle.enqueue(storeContext, _messageId, q);
+        }
 
         if (_transientMessageData.getContentHeaderBody().bodySize == 0)
         {
@@ -494,11 +507,12 @@
      */
     public AMQMessage takeReference()
     {
-        _referenceCount.incrementAndGet();
+        incrementReference(); // _referenceCount.incrementAndGet();
 
         return this;
     }
 
+
     /**
      * Threadsafe. Increment the reference count on the message.
      */
@@ -516,6 +530,7 @@
      * message store.
      *
      * @param storeContext
+     *
      * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
      */
@@ -555,7 +570,7 @@
             if (count < 0)
             {
                 throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.",
-                    null);
+                                                  null);
             }
         }
     }
@@ -684,6 +699,7 @@
      * AMQMessageHandle implementation can be picked based on various criteria.
      *
      * @param queue the queue
+     *
      * @throws org.apache.qpid.AMQException if there is an error enqueuing the message
      */
     public void enqueue(AMQQueue queue) throws AMQException
@@ -756,14 +772,13 @@
     /**
      * Checks to see if the message has expired. If it has the message is dequeued.
      *
-     * @param storecontext
-     * @param queue
+     * @param queue The queue to check the expiration against. (Currently not used)
      *
      * @return true if the message has expire
      *
      * @throws AMQException
      */
-    public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+    public boolean expired(AMQQueue queue) throws AMQException
     {
         // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
 
@@ -771,12 +786,7 @@
         {
             long now = System.currentTimeMillis();
 
-            if (now > _expiration)
-            {
-                dequeue(storecontext, queue);
-
-                return true;
-            }
+            return (now > _expiration);
         }
 
         return false;
@@ -803,7 +813,7 @@
             // first we allow the handle to know that the message has been fully received. This is useful if it is
             // maintaining any calculated values based on content chunks
             _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
-                _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
+                                                          _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
 
             // we then allow the transactional context to do something with the message content
             // now that it has all been received, before we attempt delivery
@@ -1039,7 +1049,7 @@
         // _taken + " by :" + _takenBySubcription;
 
         return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
-            + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+               + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
     }
 
     public Subscription getDeliveredSubscription(AMQQueue queue)
@@ -1053,6 +1063,7 @@
 
     public void reject(Subscription subscription)
     {
+
         if (subscription != null)
         {
             if (_rejectedBy == null)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Aug 30 05:19:31 2007
@@ -20,38 +20,26 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.management.JMException;
-
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exception.InternalErrorException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.messageStore.StorableMessage;
 import org.apache.qpid.server.messageStore.StorableQueue;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import javax.management.JMException;
-
 import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Hashtable;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -65,6 +53,49 @@
  */
 public class AMQQueue implements Managable, Comparable, StorableQueue
 {
+    //FROM M2 - think these have been replaced by *Exception in the broker exception package
+//    /**
+//     * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
+//     * already exists.
+//     *
+//     * <p/><table id="crc"><caption>CRC Card</caption>
+//     * <tr><th> Responsibilities <th> Collaborations
+//     * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+//     * </table>
+//     *
+//     * @todo Not an AMQP exception as no status code.
+//     *
+//     * @todo Move to top level, used outside this class.
+//     */
+//    public static final class ExistingExclusiveSubscription extends AMQException
+//    {
+//
+//        public ExistingExclusiveSubscription()
+//        {
+//            super("");
+//        }
+//    }
+//
+//    /**
+//     * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
+//     * already exists.
+//     *
+//     * <p/><table id="crc"><caption>CRC Card</caption>
+//     * <tr><th> Responsibilities <th> Collaborations
+//     * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+//     * </table>
+//     *
+//     * @todo Not an AMQP exception as no status code.
+//     *
+//     * @todo Move to top level, used outside this class.
+//     */
+//    public static final class ExistingSubscriptionPreventsExclusive extends AMQException
+//    {
+//        public ExistingSubscriptionPreventsExclusive()
+//        {
+//            super("");
+    //        }
+    //    }
     public static int s_queueID = 0;
 
     private static final Logger _logger = Logger.getLogger(AMQQueue.class);
@@ -163,22 +194,22 @@
     }
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
-        throws AMQException
+            throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
-            new SubscriptionSet(), new SubscriptionImpl.Factory());
+             new SubscriptionSet(), new SubscriptionImpl.Factory());
     }
 
     protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
-        VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
+                       VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
-            new SubscriptionImpl.Factory());
+             new SubscriptionImpl.Factory());
     }
 
     protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
-        VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
-        SubscriptionFactory subscriptionFactory) throws AMQException
+                       VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
+                       SubscriptionFactory subscriptionFactory) throws AMQException
     {
         if (name == null)
         {
@@ -298,32 +329,217 @@
      * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
      * locks from both queues and start async delivery
      *
-     * @param fromMessageId
-     * @param toMessageId
-     * @param queueName
-     * @param storeContext
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param queueName     The queue to move the messages to.
+     * @param storeContext  The context of the message store under which to perform the move. This is associated with
+     *                      the stores transactional context.
      */
     public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-        StoreContext storeContext)
+                                                        StoreContext storeContext)
+    {
+        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+        MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+        if (toStore != fromStore)
+        {
+            throw new RuntimeException("Can only move messages between queues on the same message store.");
+        }
+
+        try
+        {
+            // Obtain locks to prevent activity on the queues being moved between.
+            startMovingMessages();
+            toQueue.startMovingMessages();
+
+            // Get the list of messages to move.
+            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+            try
+            {
+                fromStore.beginTran(storeContext);
+
+                // Move the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+                    toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+                }
+
+                // Move the messages on the in-memory queues.
+                toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+                _deliveryMgr.removeMovedMessages(foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+                }
+            }
+        }
+        // Release locks to allow activity on the queues being moved between to continue.
+        finally
+        {
+            toQueue.stopMovingMessages();
+            stopMovingMessages();
+        }
+    }
+
+    /**
+     * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity
+     * on the queues being moved between is suspended during the move.
+     *
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param queueName     The queue to move the messages to.
+     * @param storeContext  The context of the message store under which to perform the move. This is associated with
+     *                      the stores transactional context.
+     */
+    public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+                                                        StoreContext storeContext)
+    {
+        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+        MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+        if (toStore != fromStore)
+        {
+            throw new RuntimeException("Can only move messages between queues on the same message store.");
+        }
+
+        try
+        {
+            // Obtain locks to prevent activity on the queues being moved between.
+            startMovingMessages();
+            toQueue.startMovingMessages();
+
+            // Get the list of messages to move.
+            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+            try
+            {
+                fromStore.beginTran(storeContext);
+
+                // Move the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+                    message.takeReference();
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+                }
+
+                // Move the messages on the in-memory queues.
+                toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+                }
+            }
+        }
+        // Release locks to allow activity on the queues being moved between to continue.
+        finally
+        {
+            toQueue.stopMovingMessages();
+            stopMovingMessages();
+        }
+    }
+
+    /**
+     * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
+     * on the queues being moved between is suspended during the remove.
+     *
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param storeContext  The context of the message store under which to perform the move. This is associated with
+     *                      the stores transactional context.
+     */
+    public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
     {
-        // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock
-        AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+
         try
         {
+            // Obtain locks to prevent activity on the queues being moved between.
             startMovingMessages();
+
+            // Get the list of messages to move.
             List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
-            // move messages to another queue
-            anotherQueue.startMovingMessages();
-            anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+            try
+            {
+                fromStore.beginTran(storeContext);
+
+                // remove the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+                }
 
-            // moving is successful, now remove from original queue
-            _deliveryMgr.removeMovedMessages(foundMessagesList);
+                // remove the messages on the in-memory queues.
+                _deliveryMgr.removeMovedMessages(foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+                }
+            }
         }
+        // Release locks to allow activity on the queues being moved between to continue.
         finally
         {
-            // remove the lock and start the async delivery
-            anotherQueue.stopMovingMessages();
             stopMovingMessages();
         }
     }
@@ -426,14 +642,16 @@
         exchange.registerQueue(routingKey, this, arguments);
         if (isDurable() && exchange.isDurable())
         {
-            try
-            {
-                _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
-            }
-            catch (InternalErrorException e)
-            {
-                throw new AMQException(null, "Problem binding queue ", e);
-            }
+            _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+            //DTX MessageStore
+//            try
+//            {
+//                _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+//            }
+//            catch (InternalErrorException e)
+//            {
+//                throw new AMQException(null, "Problem binding queue ", e);
+//            }
         }
 
         _bindings.addBinding(routingKey, arguments, exchange);
@@ -444,21 +662,24 @@
         exchange.deregisterQueue(routingKey, this, arguments);
         if (isDurable() && exchange.isDurable())
         {
-            try
-            {
-                _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
-            }
-            catch (InternalErrorException e)
-            {
-                throw new AMQException(null, "problem unbinding queue", e);
-            }
+
+            _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+            //DTX MessageStore
+//            try
+//            {
+//                _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+//            }
+//            catch (InternalErrorException e)
+//            {
+//                throw new AMQException(null, "problem unbinding queue", e);
+//            }
         }
 
         _bindings.remove(routingKey, arguments, exchange);
     }
 
     public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
-        FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+                                        FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
     {
         if (incrementSubscriberCount() > 1)
         {
@@ -487,7 +708,7 @@
         }
 
         Subscription subscription =
-            _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
+                _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
 
         if (subscription.filtersMessages())
         {
@@ -532,11 +753,11 @@
         Subscription removedSubscription;
 
         if ((removedSubscription =
-                        _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
-                == null)
+                _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
+            == null)
         {
             throw new AMQException(null, "Protocol session with channel " + channel + " and consumer tag " + consumerTag
-                + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null);
+                                         + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null);
         }
 
         removedSubscription.close();

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Aug 30 05:19:31 2007
@@ -18,30 +18,23 @@
  * under the License.
  *
  */
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
 package org.apache.qpid.server.queue;
 
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.store.StoreContext;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -60,30 +53,25 @@
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 
-import org.apache.log4j.Logger;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * MBean class for AMQQueue. It implements all the management features exposed
  * for an AMQQueue.
+ * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
  */
 @MBeanDescription("Management Interface for AMQQueue")
 public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
 {
+    /** Used for debugging purposes. */
     private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
     private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
 
     /**