You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/08/30 14:19:42 UTC
svn commit: r571129 [3/15] - in /incubator/qpid/trunk/qpid/java: ./ broker/
broker/bin/ broker/distribution/src/main/assembly/ broker/etc/
broker/src/main/java/org/apache/log4j/
broker/src/main/java/org/apache/qpid/configuration/ broker/src/main/java/o...
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java Thu Aug 30 05:19:31 2007
@@ -17,29 +17,50 @@
*/
package org.apache.qpid.server.messageStore;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.InvalidXidException;
+import org.apache.qpid.server.exception.MessageAlreadyStagedException;
+import org.apache.qpid.server.exception.MessageDoesntExistException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
+import org.apache.qpid.server.exception.QueueDoesntExistException;
+import org.apache.qpid.server.exception.UnknownXidException;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.*;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.server.txn.JDBCAbstractRecord;
+import org.apache.qpid.server.txn.JDBCDequeueRecord;
+import org.apache.qpid.server.txn.JDBCEnqueueRecord;
+import org.apache.qpid.server.txn.JDBCTransaction;
+import org.apache.qpid.server.txn.JDBCTransactionManager;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.TransactionRecord;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.XidImpl;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.transaction.xa.Xid;
-import java.util.Collection;
-import java.util.List;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.sql.*;
+import java.util.List;
/**
* Created by Arnaud Simon
@@ -173,16 +194,18 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchange +
- " (Name,Type) VALUES (?,?)");
+ " (Name,Type) VALUES (?,?)");
connection.getStatements()[CREATE_EXCHANGE] = pstmt;
}
pstmt.setString(1, exchange.getName().asString());
pstmt.setString(2, exchange.getType().asString());
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -190,7 +213,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -216,15 +240,17 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchange +
- " WHERE Name = ?");
+ " WHERE Name = ?");
connection.getStatements()[DELETE_EXCHANGE] = pstmt;
}
pstmt.setString(1, exchange.getName().asString());
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -232,7 +258,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -258,7 +285,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchangeQueueRelation +
- " (QueueID,Name,RoutingKey,fieldTable) VALUES (?,?,?,?)");
+ " (QueueID,Name,RoutingKey,fieldTable) VALUES (?,?,?,?)");
connection.getStatements()[BIND_QUEUE] = pstmt;
}
pstmt.setInt(1, queue.getQueueID());
@@ -267,15 +294,18 @@
if (args != null)
{
pstmt.setBytes(4, args.getDataAsBytes());
- } else
+ }
+ else
{
pstmt.setBytes(4, null);
}
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -283,7 +313,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -307,17 +338,19 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchangeQueueRelation +
- " WHERE QueueID = ? AND NAME = ? AND RoutingKey = ?");
+ " WHERE QueueID = ? AND NAME = ? AND RoutingKey = ?");
connection.getStatements()[UNBIND_QUEUE] = pstmt;
}
pstmt.setInt(1, queue.getQueueID());
pstmt.setString(2, exchange.getName().asString());
pstmt.setString(3, routingKey.asString());
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -325,7 +358,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -349,7 +383,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueue +
- " (QueueID,Name,Owner) VALUES (?,?,?)");
+ " (QueueID,Name,Owner) VALUES (?,?,?)");
connection.getStatements()[CREATE_QUEUE] = pstmt;
}
pstmt.setInt(1, queue.getQueueID());
@@ -357,15 +391,18 @@
if (queue.getOwner() != null)
{
pstmt.setString(3, queue.getOwner().asString());
- } else
+ }
+ else
{
pstmt.setString(3, null);
}
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot create Queue: " + queue, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -373,7 +410,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -397,15 +435,17 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueue +
- " WHERE QueueID = ?");
+ " WHERE QueueID = ?");
connection.getStatements()[DELETE_QUEUE] = pstmt;
}
pstmt.setInt(1, queue.getQueueID());
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot remove Queue: " + queue, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -413,7 +453,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -439,10 +480,12 @@
{
connection = (MyConnection) _connectionPool.acquireInstance();
stage(connection, m);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot stage Message: " + m, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -450,7 +493,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -470,19 +514,21 @@
if (!m.isStaged())
{
_log.error("Cannot append content of message Id "
- + m.getMessageId() + " as it has not been staged");
+ + m.getMessageId() + " as it has not been staged");
throw new MessageDoesntExistException("Cannot append content of message Id "
- + m.getMessageId() + " as it has not been staged");
+ + m.getMessageId() + " as it has not been staged");
}
MyConnection connection = null;
try
{
connection = (MyConnection) _connectionPool.acquireInstance();
appendContent(connection, m, data, offset, size);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot stage Message: " + m, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -490,7 +536,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -515,7 +562,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
- " WHERE MessageID = ? ");
+ " WHERE MessageID = ? ");
connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
}
pstmt.setLong(1, m.getMessageId());
@@ -523,7 +570,7 @@
if (!rs.next())
{
throw new MessageDoesntExistException("Cannot load content of message Id "
- + m.getMessageId() + " as it has not been found");
+ + m.getMessageId() + " as it has not been found");
}
Blob myBlob = rs.getBlob(1);
@@ -532,21 +579,25 @@
if (size == 0)
{
result = myBlob.getBytes(offset, (int) myBlob.length());
- } else
+ }
+ else
{
result = myBlob.getBytes(offset, size);
}
- } else
+ }
+ else
{
throw new MessageDoesntExistException("Cannot load content of message Id "
- + m.getMessageId() + " as it has not been found");
+ + m.getMessageId() + " as it has not been found");
}
rs.close();
return result;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot load Message: " + m, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -554,7 +605,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -575,10 +627,12 @@
{
connection = (MyConnection) _connectionPool.acquireInstance();
destroy(connection, m);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot destroy message: " + m, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -586,7 +640,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -613,14 +668,16 @@
{
// add an enqueue record
tx.addRecord(new JDBCEnqueueRecord(m, queue));
- } else
+ }
+ else
{
try
{
if (tx != null)
{
connection = tx.getConnection();
- } else
+ }
+ else
{
connection = (MyConnection) _connectionPool.acquireInstance();
}
@@ -634,7 +691,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueueMessageRelation +
- " (QueueID,MessageID,Prepared) VALUES (?,?,0)");
+ " (QueueID,MessageID,Prepared) VALUES (?,?,0)");
connection.getStatements()[ENQUEUE] = pstmt;
}
pstmt.setInt(1, queue.getQueueID());
@@ -642,10 +699,12 @@
pstmt.executeUpdate();
m.enqueue(queue);
queue.enqueue(m);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
- } finally
+ }
+ finally
{
if (tx == null && connection != null)
{
@@ -653,7 +712,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -680,14 +740,16 @@
{
// add an dequeue record
tx.addRecord(new JDBCDequeueRecord(m, queue));
- } else
+ }
+ else
{
try
{
if (tx != null)
{
connection = tx.getConnection();
- } else
+ }
+ else
{
connection = (MyConnection) _connectionPool.acquireInstance();
}
@@ -695,7 +757,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueueMessageRelation +
- " WHERE QueueID = ? AND MessageID = ?");
+ " WHERE QueueID = ? AND MessageID = ?");
connection.getStatements()[DEQUEUE] = pstmt;
}
pstmt.setInt(1, queue.getQueueID());
@@ -708,10 +770,12 @@
destroy(connection, m);
}
queue.dequeue(m);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
- } finally
+ }
+ finally
{
if (tx == null && connection != null)
{
@@ -719,7 +783,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -756,14 +821,16 @@
queueOwner = new AMQShortString(rs.getString(3));
}
result.add(new AMQQueue(new AMQShortString(rs.getString(2)), true, queueOwner,
- false, _virtualHost));
+ false, _virtualHost));
}
rs.close();
return result;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot get all queues", e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -771,7 +838,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -791,10 +859,12 @@
{
connection = (MyConnection) _connectionPool.acquireInstance();
return getAllMessages(connection, queue);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot get all queues", e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -802,7 +872,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -821,7 +892,7 @@
HashMap<Xid, Transaction> result = new HashMap<Xid, Transaction>();
try
{
- TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+ //TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
// re-create all the tx
connection = (MyConnection) _connectionPool.acquireInstance();
@@ -840,11 +911,11 @@
}
foundTx = new JDBCTransaction();
foundXid = new XidImpl(rs.getBlob(3).getBytes(1, (int) rs.getBlob(3).length()),
- rs.getInt(2), rs.getBlob(4).getBytes(1, (int) rs.getBlob(4).length()));
+ rs.getInt(2), rs.getBlob(4).getBytes(1, (int) rs.getBlob(4).length()));
// get all the records
Statement stmtr = connection.getConnection().createStatement();
ResultSet rsr = stmtr.executeQuery("SELECT * FROM " + _tableNameRecord +
- " WHERE XID_ID = " + rs.getLong(1));
+ " WHERE XID_ID = " + rs.getLong(1));
int foundType;
AMQQueue foundQueue;
StorableMessage foundMessage;
@@ -854,11 +925,14 @@
// those messages were not recovered before so they need to be recreated
foundType = rsr.getInt(2);
foundQueue = _queueMap.get(new Integer(rsr.getInt(4)));
- foundMessage = new AMQMessage(rs.getLong(3), this, messageHandleFactory, txnContext);
+
+ //DTX MessageStore - this -> null , txContext -> null
+ foundMessage = new AMQMessage(rs.getLong(3), null, messageHandleFactory, null);
if (foundType == JDBCAbstractRecord.TYPE_DEQUEUE)
{
foundRecord = new JDBCDequeueRecord(foundMessage, foundQueue);
- } else
+ }
+ else
{
foundRecord = new JDBCEnqueueRecord(foundMessage, foundQueue);
}
@@ -870,10 +944,12 @@
}
rs.close();
return result;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot recover: ", e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -881,7 +957,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -917,7 +994,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -934,7 +1012,8 @@
{
connection.getConnection().rollback();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to rollback this connection
// it is better to release it
@@ -952,7 +1031,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
- " WHERE MessageID = ? ");
+ " WHERE MessageID = ? ");
connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
}
pstmt.setLong(1, m.getMessageId());
@@ -960,14 +1039,15 @@
if (!rs.next())
{
throw new MessageDoesntExistException("Cannot append content of message Id "
- + m.getMessageId() + " as it has not been found");
+ + m.getMessageId() + " as it has not been found");
}
Blob myBlob = rs.getBlob(1);
byte[] oldPayload;
if (myBlob != null && myBlob.length() > 0)
{
oldPayload = myBlob.getBytes(1, (int) myBlob.length());
- } else
+ }
+ else
{
oldPayload = new byte[0];
}
@@ -980,7 +1060,7 @@
if (pstmtUpdate == null)
{
pstmtUpdate = connection.getConnection().prepareStatement("UPDATE " + _tableNameMessage +
- " SET Payload = ? WHERE MessageID = ?");
+ " SET Payload = ? WHERE MessageID = ?");
connection.getStatements()[UPDATE_MESSAGE_PAYLOAD] = pstmtUpdate;
}
pstmtUpdate.setBytes(1, newPayload);
@@ -996,7 +1076,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameMessage +
- " (MessageID,Header,ExchangeName,RoutingKey,Mandatory,Is_Immediate) VALUES (?,?,?,?,?,?)");
+ " (MessageID,Header,ExchangeName,RoutingKey,Mandatory,Is_Immediate) VALUES (?,?,?,?,?,?)");
connection.getStatements()[STAGE_MESSAGE] = pstmt;
}
pstmt.setLong(1, m.getMessageId());
@@ -1019,7 +1099,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameRecord +
- " (XID_ID,Type,MessageID,QueueID) VALUES (?,?,?,?)");
+ " (XID_ID,Type,MessageID,QueueID) VALUES (?,?,?,?)");
connection.getStatements()[SAVE_RECORD] = pstmt;
}
pstmt.setLong(1, tx.getXidID());
@@ -1027,7 +1107,8 @@
pstmt.setLong(3, record.getMessageID());
pstmt.setLong(4, record.getQueueID());
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot save record: " + record, e);
}
@@ -1043,7 +1124,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameTransaction +
- " (XID_ID,FormatId, BranchQualifier,GlobalTransactionId) VALUES (?,?,?,?)");
+ " (XID_ID,FormatId, BranchQualifier,GlobalTransactionId) VALUES (?,?,?,?)");
connection.getStatements()[SAVE_XID] = pstmt;
}
pstmt.setLong(1, tx.getXidID());
@@ -1051,7 +1132,8 @@
pstmt.setBytes(3, xid.getBranchQualifier());
pstmt.setBytes(4, xid.getGlobalTransactionId());
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot save xid: " + xid, e);
}
@@ -1067,12 +1149,13 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameRecord +
- " WHERE XID_ID = ?");
+ " WHERE XID_ID = ?");
connection.getStatements()[DELETE_RECORD] = pstmt;
}
pstmt.setLong(1, tx.getXidID());
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot delete record: " + tx.getXidID(), e);
}
@@ -1088,12 +1171,13 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameTransaction +
- " WHERE XID_ID = ?");
+ " WHERE XID_ID = ?");
connection.getStatements()[DELETE_XID] = pstmt;
}
pstmt.setLong(1, tx.getXidID());
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot delete xid: " + tx.getXidID(), e);
}
@@ -1142,14 +1226,15 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("UPDATE " + _tableNameQueueMessageRelation +
- " SET Prepared = ? WHERE MessageID = ? AND QueueID = ?");
+ " SET Prepared = ? WHERE MessageID = ? AND QueueID = ?");
connection.getStatements()[UPDATE_QMR] = pstmt;
}
pstmt.setInt(1, prepared);
pstmt.setLong(2, messageId);
pstmt.setInt(3, queueID);
pstmt.executeUpdate();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot update QMR", e);
}
@@ -1169,8 +1254,8 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("SELECT ExchangeName, RoutingKey," +
- " Mandatory, Is_Immediate from " + _tableNameMessage +
- " WHERE MessageID = ?");
+ " Mandatory, Is_Immediate from " + _tableNameMessage +
+ " WHERE MessageID = ?");
connection.getStatements()[GET_MESSAGE_INFO] = pstmt;
}
pstmt.setLong(1, m.getMessageId());
@@ -1204,16 +1289,19 @@
return routingKey;
}
};
- } else
+ }
+ else
{
throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
}
rs.close();
return result;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -1221,7 +1309,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -1245,7 +1334,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("SELECT Header from " + _tableNameMessage +
- " WHERE MessageID = ?");
+ " WHERE MessageID = ?");
connection.getStatements()[GET_CONTENT_HEADER] = pstmt;
}
pstmt.setLong(1, m.getMessageId());
@@ -1253,16 +1342,19 @@
if (rs.next())
{
result = new ContentHeaderBody(ByteBuffer.wrap(rs.getBlob(1).getBytes(1, (int) rs.getBlob(1).length())), 0);
- } else
+ }
+ else
{
throw new InternalErrorException("Cannot get Content Header of message: " + m);
}
rs.close();
return result;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot get Content Header of message: " + m, e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -1270,7 +1362,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -1287,21 +1380,21 @@
AMQException
{
List<StorableMessage> result = new ArrayList<StorableMessage>();
- TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+// TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
PreparedStatement pstmt = connection.getStatements()[GET_ALL_MESSAGES];
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("SELECT " + _tableNameMessage + ".MessageID, Header FROM " +
- _tableNameMessage +
- " INNER JOIN " +
- _tableNameQueueMessageRelation +
- " ON " +
- _tableNameMessage + ".MessageID = " + _tableNameQueueMessageRelation + ".MessageID" +
- " WHERE " +
- _tableNameQueueMessageRelation + ".QueueID = ?" +
- " AND " +
- _tableNameQueueMessageRelation + ".Prepared = 0");
+ _tableNameMessage +
+ " INNER JOIN " +
+ _tableNameQueueMessageRelation +
+ " ON " +
+ _tableNameMessage + ".MessageID = " + _tableNameQueueMessageRelation + ".MessageID" +
+ " WHERE " +
+ _tableNameQueueMessageRelation + ".QueueID = ?" +
+ " AND " +
+ _tableNameQueueMessageRelation + ".Prepared = 0");
connection.getStatements()[GET_ALL_MESSAGES] = pstmt;
}
pstmt.setInt(1, queue.getQueueID());
@@ -1310,7 +1403,10 @@
// ContentHeaderBody hb;
while (rs.next())
{
- foundMessage = new AMQMessage(rs.getLong(1), this, messageHandleFactory, txnContext);
+
+ //DTX MessageStore - this -> null , txContext -> null
+ foundMessage = new AMQMessage(rs.getLong(1), null, messageHandleFactory, null);
+
result.add(foundMessage);
}
rs.close();
@@ -1340,7 +1436,7 @@
owner = new AMQShortString(rs.getString(3));
}
foundQueue = new AMQQueue(new AMQShortString(rs.getString(2)),
- true, owner, false, _virtualHost);
+ true, owner, false, _virtualHost);
// get all the Messages of that queue
foundMessages = getAllMessages(connection, foundQueue);
// enqueue those messages
@@ -1350,7 +1446,7 @@
}
for (StorableMessage foundMessage : foundMessages)
{
- foundMessage.staged();
+ foundMessage.staged();
foundMessage.enqueue(foundQueue);
foundQueue.enqueue(foundMessage);
foundQueue.process(context, (AMQMessage) foundMessage, false);
@@ -1362,10 +1458,12 @@
}
rs.close();
return result;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot recover: ", e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -1373,7 +1471,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -1404,7 +1503,7 @@
// get all the bindings
Statement stmtb = connection.getConnection().createStatement();
ResultSet rsb = stmtb.executeQuery("SELECT * FROM " + _tableNameExchangeQueueRelation +
- " WHERE Name = '" + rs.getString(1) + "'");
+ " WHERE Name = '" + rs.getString(1) + "'");
while (rsb.next())
{
foundQueue = queueMap.get(new Integer(rsb.getInt(1)));
@@ -1426,10 +1525,12 @@
_virtualHost.getExchangeRegistry().registerExchange(foundExchange);
}
rs.close();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new InternalErrorException("Cannot recover: ", e);
- } finally
+ }
+ finally
{
if (connection != null)
{
@@ -1437,7 +1538,8 @@
{
connection.getConnection().commit();
_connectionPool.releaseInstance(connection);
- } catch (SQLException e)
+ }
+ catch (SQLException e)
{
// we did not manage to commit this connection
// it is better to release it
@@ -1456,7 +1558,7 @@
if (pstmt == null)
{
pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameMessage +
- " WHERE MessageID = ?");
+ " WHERE MessageID = ?");
connection.getStatements()[DELETE_MESSAGE] = pstmt;
}
pstmt.setLong(1, m.getMessageId());
@@ -1589,8 +1691,8 @@
try
{
stmt.executeUpdate("CREATE TABLE " + _tableNameMessage + " (MessageID FLOAT NOT NULL, Header BLOB," +
- " Payload BLOB, ExchangeName VARCHAR(1024), RoutingKey VARCHAR(1024)," +
- " Mandatory INTEGER, Is_Immediate INTEGER, PRIMARY KEY(MessageID))");
+ " Payload BLOB, ExchangeName VARCHAR(1024), RoutingKey VARCHAR(1024)," +
+ " Mandatory INTEGER, Is_Immediate INTEGER, PRIMARY KEY(MessageID))");
myconnection._connection.commit();
}
catch (SQLException ex)
@@ -1602,7 +1704,7 @@
try
{
stmt.executeUpdate("CREATE TABLE " + _tableNameQueue + " (QueueID INTEGER NOT NULL, " +
- "Name VARCHAR(1024) NOT NULL, Owner VARCHAR(1024), PRIMARY KEY(QueueID))");
+ "Name VARCHAR(1024) NOT NULL, Owner VARCHAR(1024), PRIMARY KEY(QueueID))");
myconnection._connection.commit();
}
catch (SQLException ex)
@@ -1614,7 +1716,7 @@
try
{
stmt.executeUpdate("CREATE TABLE " + _tableNameQueueMessageRelation + " (QueueID INTEGER NOT NULL, " +
- "MessageID FLOAT NOT NULL, Prepared INTEGER)");
+ "MessageID FLOAT NOT NULL, Prepared INTEGER)");
myconnection._connection.commit();
}
catch (SQLException ex)
@@ -1625,7 +1727,7 @@
try
{
stmt.executeUpdate("CREATE TABLE " + _tableNameExchange + " (Name VARCHAR(1024) NOT NULL, " +
- "Type VARCHAR(1024) NOT NULL, PRIMARY KEY(Name))");
+ "Type VARCHAR(1024) NOT NULL, PRIMARY KEY(Name))");
myconnection._connection.commit();
}
catch (SQLException ex)
@@ -1636,7 +1738,7 @@
try
{
stmt.executeUpdate("CREATE TABLE " + _tableNameExchangeQueueRelation + " (QueueID INTEGER NOT NULL, " +
- "Name VARCHAR(1024) NOT NULL, RoutingKey VARCHAR(1024), FieldTable BLOB )");
+ "Name VARCHAR(1024) NOT NULL, RoutingKey VARCHAR(1024), FieldTable BLOB )");
myconnection._connection.commit();
}
catch (SQLException ex)
@@ -1647,7 +1749,7 @@
try
{
stmt.executeUpdate("CREATE TABLE " + _tableNameRecord + " (XID_ID FLOAT, Type INTEGER, MessageID FLOAT, " +
- "QueueID INTEGER, PRIMARY KEY(Type, MessageID, QueueID))");
+ "QueueID INTEGER, PRIMARY KEY(Type, MessageID, QueueID))");
// we could alter the table with QueueID as foreign key
myconnection._connection.commit();
}
@@ -1659,7 +1761,7 @@
try
{
stmt.executeUpdate("CREATE TABLE " + _tableNameTransaction + " (XID_ID FLOAT, FormatId INTEGER, " +
- "BranchQualifier BLOB, GlobalTransactionId BLOB, PRIMARY KEY(XID_ID))");
+ "BranchQualifier BLOB, GlobalTransactionId BLOB, PRIMARY KEY(XID_ID))");
myconnection._connection.commit();
}
catch (SQLException ex)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java Thu Aug 30 05:19:31 2007
@@ -18,15 +18,23 @@
*/
package org.apache.qpid.server.messageStore;
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.InvalidXidException;
+import org.apache.qpid.server.exception.MessageAlreadyStagedException;
+import org.apache.qpid.server.exception.MessageDoesntExistException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
+import org.apache.qpid.server.exception.QueueDoesntExistException;
+import org.apache.qpid.server.exception.UnknownXidException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.transaction.xa.Xid;
import java.util.Collection;
@@ -203,7 +211,7 @@
* @throws InternalErrorException In case of internal message store problem
* @throws MessageDoesntExistException If the message does not exist
*/
- public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+ public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
throws
InternalErrorException,
MessageDoesntExistException;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Thu Aug 30 05:19:31 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,6 +44,9 @@
import org.apache.qpid.server.transport.ConnectorConfiguration;
import org.apache.qpid.ssl.SSLContextFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
/**
* The protocol handler handles "protocol events" for all connections. The state
* associated with an individual connection is accessed through the protocol session.
@@ -80,12 +83,12 @@
final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
createSession(protocolSession, _applicationRegistry, codecFactory);
- _logger.info("Protocol session created");
+ _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
- ConnectorConfiguration connectorConfig =
- ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class);
+ ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
+ getConfiguredObject(ConnectorConfiguration.class);
if (connectorConfig.enableExecutorPool)
{
if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession))
@@ -95,7 +98,7 @@
String certType = connectorConfig.certType;
SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(sslContextFactory.buildServerContext()));
+ new SSLFilter(sslContextFactory.buildServerContext()));
}
protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
@@ -119,22 +122,21 @@
/**
* Separated into its own, protected, method to allow easier reuse
*/
- protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec)
- throws AMQException
+ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
{
new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
}
public void sessionOpened(IoSession protocolSession) throws Exception
{
- _logger.info("Session opened");
+ _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
}
public void sessionClosed(IoSession protocolSession) throws Exception
{
- _logger.info("Protocol Session closed");
+ _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
- // fixme -- this can be null
+ //fixme -- this can be null
if (amqProtocolSession != null)
{
amqProtocolSession.closeSession();
@@ -143,15 +145,15 @@
public void sessionIdle(IoSession session, IdleStatus status) throws Exception
{
- _logger.debug("Protocol Session [" + this + "] idle: " + status);
+ _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress());
if (IdleStatus.WRITER_IDLE.equals(status))
{
- // write heartbeat frame:
+ //write heartbeat frame:
session.write(HeartbeatBody.FRAME);
}
else if (IdleStatus.READER_IDLE.equals(status))
{
- // failover:
+ //failover:
throw new IOException("Timed out while waiting for heartbeat from peer.");
}
@@ -167,7 +169,7 @@
protocolSession.close();
- _logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable);
+ _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable);
}
else if (throwable instanceof IOException)
{
@@ -178,13 +180,14 @@
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
// Be aware of possible changes to parameter order as versions change.
- protocolSession.write(ConnectionCloseBody.createAMQFrame(0, session.getProtocolMajorVersion(),
- session.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- 200, // replyCode
- new AMQShortString(throwable.getMessage()) // replyText
- ));
+ protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+ session.getProtocolMajorVersion(),
+ session.getProtocolMinorVersion(), // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ 200, // replyCode
+ new AMQShortString(throwable.getMessage()) // replyText
+ ));
protocolSession.close();
}
}
@@ -203,6 +206,7 @@
if (message instanceof AMQDataBlock)
{
amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
+
}
else if (message instanceof ByteBuffer)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Aug 30 05:19:31 2007
@@ -20,17 +20,8 @@
*/
package org.apache.qpid.server.queue;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-/** Combines the information that make up a deliverable message into a more manageable form. */
-
import org.apache.log4j.Logger;
-
import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -40,22 +31,33 @@
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.messageStore.StorableMessage;
import org.apache.qpid.server.messageStore.StorableQueue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Combines the information that make up a deliverable message into a more manageable form.
*/
public class AMQMessage implements StorableMessage
{
+ /** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- // The ordered list of queues into which this message is enqueued.
+ // The ordered list of queues into which this message is enqueued.
private List<StorableQueue> _queues = new LinkedList<StorableQueue>();
// Indicates whether this message is staged
private boolean _isStaged = false;
@@ -66,7 +68,7 @@
private Set<Object> _tokens;
/**
- * Only use in clustering - should ideally be removed?
+ * Only use in clustering - //todo: should ideally be removed?
*/
private AMQProtocolSession _publisher;
@@ -76,12 +78,13 @@
private AMQMessageHandle _messageHandle;
+ /** Holds the transactional context in which this message is being processed. */
// TODO: ideally this should be able to go into the transient message date - check this! (RG)
private TransactionalContext _txnContext;
/**
- * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
- * messages published with the 'immediate' flag.
+ * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
/**
@@ -89,18 +92,25 @@
* checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
* removed from the store.
*/
+
+ /** Flag to indicate that this message requires 'immediate' delivery. */
private boolean _immediate;
// private Subscription _takenBySubcription;
// private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
+ //todo: this should be part of a messageOnQueue object
private Set<Subscription> _rejectedBy = null;
+ //todo: this should be part of a messageOnQueue object
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
+ //todo: this should be part of a messageOnQueue object
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
private final int hashcode = System.identityHashCode(this);
+
+ //todo: this should be part of a messageOnQueue object
private long _expiration;
public String debugIdentity()
@@ -111,9 +121,9 @@
public void setExpiration()
{
long expiration =
- ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
long timestamp =
- ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
{
@@ -176,8 +186,8 @@
{
AMQBody cb =
- getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
- _messageId, ++_index));
+ getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+ _messageId, ++_index));
return new AMQFrame(_channel, cb);
}
@@ -259,10 +269,11 @@
* @param messageId
* @param store
* @param factory
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
- throws AMQException
+ throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(store, this, true);
@@ -279,7 +290,7 @@
* @param contentHeader
*/
public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
- ContentHeaderBody contentHeader) throws AMQException
+ ContentHeaderBody contentHeader) throws AMQException
{
this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
@@ -294,11 +305,12 @@
* @param contentHeader
* @param destinationQueues
* @param contentBodies
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
- ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
- MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
+ ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+ MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
{
this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
@@ -443,22 +455,23 @@
}
public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
- throws AMQException
+ throws AMQException
{
final boolean persistent = isPersistent();
_messageHandle = factory.createMessageHandle(store, this, persistent);
- // if (persistent)
- // {
- _txnContext.beginTranIfNecessary();
- // }
+ if (persistent) //DTX was removed
+ {
+ _txnContext.beginTranIfNecessary();
+ }
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
- // for (AMQQueue q : _transientMessageData.getDestinationQueues())
- // {
- // _messageHandle.enqueue(storeContext, _messageId, q);
- // }
+ //DTX was removed
+ for (AMQQueue q : _transientMessageData.getDestinationQueues())
+ {
+ _messageHandle.enqueue(storeContext, _messageId, q);
+ }
if (_transientMessageData.getContentHeaderBody().bodySize == 0)
{
@@ -494,11 +507,12 @@
*/
public AMQMessage takeReference()
{
- _referenceCount.incrementAndGet();
+ incrementReference(); // _referenceCount.incrementAndGet();
return this;
}
+
/**
* Threadsafe. Increment the reference count on the message.
*/
@@ -516,6 +530,7 @@
* message store.
*
* @param storeContext
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
@@ -555,7 +570,7 @@
if (count < 0)
{
throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.",
- null);
+ null);
}
}
}
@@ -684,6 +699,7 @@
* AMQMessageHandle implementation can be picked based on various criteria.
*
* @param queue the queue
+ *
* @throws org.apache.qpid.AMQException if there is an error enqueuing the message
*/
public void enqueue(AMQQueue queue) throws AMQException
@@ -756,14 +772,13 @@
/**
* Checks to see if the message has expired. If it has the message is dequeued.
*
- * @param storecontext
- * @param queue
+ * @param queue The queue to check the expiration against. (Currently not used)
*
* @return true if the message has expire
*
* @throws AMQException
*/
- public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+ public boolean expired(AMQQueue queue) throws AMQException
{
// note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
@@ -771,12 +786,7 @@
{
long now = System.currentTimeMillis();
- if (now > _expiration)
- {
- dequeue(storecontext, queue);
-
- return true;
- }
+ return (now > _expiration);
}
return false;
@@ -803,7 +813,7 @@
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
_messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
- _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
+ _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -1039,7 +1049,7 @@
// _taken + " by :" + _takenBySubcription;
return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
- + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
@@ -1053,6 +1063,7 @@
public void reject(Subscription subscription)
{
+
if (subscription != null)
{
if (_rejectedBy == null)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Aug 30 05:19:31 2007
@@ -20,38 +20,26 @@
*/
package org.apache.qpid.server.queue;
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.management.JMException;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exception.InternalErrorException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.messageStore.StorableMessage;
import org.apache.qpid.server.messageStore.StorableQueue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
-
import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -65,6 +53,49 @@
*/
public class AMQQueue implements Managable, Comparable, StorableQueue
{
+ //FROM M2 - think these have been replaced by *Exception in the broker exception package
+// /**
+// * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
+// * already exists.
+// *
+// * <p/><table id="crc"><caption>CRC Card</caption>
+// * <tr><th> Responsibilities <th> Collaborations
+// * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+// * </table>
+// *
+// * @todo Not an AMQP exception as no status code.
+// *
+// * @todo Move to top level, used outside this class.
+// */
+// public static final class ExistingExclusiveSubscription extends AMQException
+// {
+//
+// public ExistingExclusiveSubscription()
+// {
+// super("");
+// }
+// }
+//
+// /**
+// * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
+// * already exists.
+// *
+// * <p/><table id="crc"><caption>CRC Card</caption>
+// * <tr><th> Responsibilities <th> Collaborations
+// * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+// * </table>
+// *
+// * @todo Not an AMQP exception as no status code.
+// *
+// * @todo Move to top level, used outside this class.
+// */
+// public static final class ExistingSubscriptionPreventsExclusive extends AMQException
+// {
+// public ExistingSubscriptionPreventsExclusive()
+// {
+// super("");
+ // }
+ // }
public static int s_queueID = 0;
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
@@ -163,22 +194,22 @@
}
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
+ throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
- new SubscriptionSet(), new SubscriptionImpl.Factory());
+ new SubscriptionSet(), new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
+ VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
- new SubscriptionImpl.Factory());
+ new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
- SubscriptionFactory subscriptionFactory) throws AMQException
+ VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
+ SubscriptionFactory subscriptionFactory) throws AMQException
{
if (name == null)
{
@@ -298,32 +329,217 @@
* (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
* locks from both queues and start async delivery
*
- * @param fromMessageId
- * @param toMessageId
- * @param queueName
- * @param storeContext
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param queueName The queue to move the messages to.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
*/
public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext)
+ StoreContext storeContext)
+ {
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+ MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+ if (toStore != fromStore)
+ {
+ throw new RuntimeException("Can only move messages between queues on the same message store.");
+ }
+
+ try
+ {
+ // Obtain locks to prevent activity on the queues being moved between.
+ startMovingMessages();
+ toQueue.startMovingMessages();
+
+ // Get the list of messages to move.
+ List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+ toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+
+ // Move the messages on the in-memory queues.
+ toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
+ }
+ // Release locks to allow activity on the queues being moved between to continue.
+ finally
+ {
+ toQueue.stopMovingMessages();
+ stopMovingMessages();
+ }
+ }
+
+ /**
+ * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity
+ * on the queues being moved between is suspended during the move.
+ *
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param queueName The queue to move the messages to.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
+ */
+ public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+ StoreContext storeContext)
+ {
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+ MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+ if (toStore != fromStore)
+ {
+ throw new RuntimeException("Can only move messages between queues on the same message store.");
+ }
+
+ try
+ {
+ // Obtain locks to prevent activity on the queues being moved between.
+ startMovingMessages();
+ toQueue.startMovingMessages();
+
+ // Get the list of messages to move.
+ List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+ message.takeReference();
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+
+ // Move the messages on the in-memory queues.
+ toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
+ }
+ // Release locks to allow activity on the queues being moved between to continue.
+ finally
+ {
+ toQueue.stopMovingMessages();
+ stopMovingMessages();
+ }
+ }
+
+ /**
+ * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
+ * on the queues being moved between is suspended during the remove.
+ *
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
+ */
+ public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
{
- // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock
- AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+
try
{
+ // Obtain locks to prevent activity on the queues being moved between.
startMovingMessages();
+
+ // Get the list of messages to move.
List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
- // move messages to another queue
- anotherQueue.startMovingMessages();
- anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // remove the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
- // moving is successful, now remove from original queue
- _deliveryMgr.removeMovedMessages(foundMessagesList);
+ // remove the messages on the in-memory queues.
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
}
+ // Release locks to allow activity on the queues being moved between to continue.
finally
{
- // remove the lock and start the async delivery
- anotherQueue.stopMovingMessages();
stopMovingMessages();
}
}
@@ -426,14 +642,16 @@
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
- try
- {
- _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
- }
- catch (InternalErrorException e)
- {
- throw new AMQException(null, "Problem binding queue ", e);
- }
+ _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+ //DTX MessageStore
+// try
+// {
+// _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+// }
+// catch (InternalErrorException e)
+// {
+// throw new AMQException(null, "Problem binding queue ", e);
+// }
}
_bindings.addBinding(routingKey, arguments, exchange);
@@ -444,21 +662,24 @@
exchange.deregisterQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
- try
- {
- _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
- }
- catch (InternalErrorException e)
- {
- throw new AMQException(null, "problem unbinding queue", e);
- }
+
+ _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+ //DTX MessageStore
+// try
+// {
+// _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+// }
+// catch (InternalErrorException e)
+// {
+// throw new AMQException(null, "problem unbinding queue", e);
+// }
}
_bindings.remove(routingKey, arguments, exchange);
}
public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
{
if (incrementSubscriberCount() > 1)
{
@@ -487,7 +708,7 @@
}
Subscription subscription =
- _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
+ _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
if (subscription.filtersMessages())
{
@@ -532,11 +753,11 @@
Subscription removedSubscription;
if ((removedSubscription =
- _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
- == null)
+ _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
+ == null)
{
throw new AMQException(null, "Protocol session with channel " + channel + " and consumer tag " + consumerTag
- + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null);
+ + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null);
}
removedSubscription.close();
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Aug 30 05:19:31 2007
@@ -18,30 +18,23 @@
* under the License.
*
*/
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
package org.apache.qpid.server.queue;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -60,30 +53,25 @@
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
/**
* MBean class for AMQQueue. It implements all the management features exposed
* for an AMQQueue.
+ * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
*/
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+ /** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
/**