You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [8/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java Wed Aug 8 11:56:59 2007
@@ -21,13 +21,14 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.util.DataByteArrayInputStream;
+
/**
* Optimized Store reader
*
* @version $Revision: 1.1.1.1 $
*/
public final class SyncDataFileReader {
-
+
private DataManagerImpl dataManager;
private DataByteArrayInputStream dataIn;
@@ -36,14 +37,16 @@
*
* @param fileId
*/
- SyncDataFileReader(DataManagerImpl fileManager){
- this.dataManager=fileManager;
- this.dataIn=new DataByteArrayInputStream();
+ SyncDataFileReader(DataManagerImpl fileManager) {
+ this.dataManager = fileManager;
+ this.dataIn = new DataByteArrayInputStream();
}
- /* (non-Javadoc)
- * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
- */
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
+ */
public synchronized byte readDataItemSize(DataItem item) throws IOException {
RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
file.seek(item.getOffset()); // jump to the size field
@@ -51,17 +54,20 @@
item.setSize(file.readInt());
return rc;
}
-
- /* (non-Javadoc)
- * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
- */
- public synchronized Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
- RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
-
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller,
+ * org.apache.activemq.kaha.StoreLocation)
+ */
+ public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
+ RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
// allocating byte[] arrays on every readItem.
- byte[] data=new byte[item.getSize()];
- file.seek(item.getOffset()+DataManagerImpl.ITEM_HEAD_SIZE);
+ byte[] data = new byte[item.getSize()];
+ file.seek(item.getOffset() + DataManagerImpl.ITEM_HEAD_SIZE);
file.readFully(data);
dataIn.restart(data);
return marshaller.readPayload(dataIn);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java Wed Aug 8 11:56:59 2007
@@ -21,87 +21,95 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.util.DataByteArrayOutputStream;
+
/**
- * Optimized Store writer. Synchronously marshalls and writes to the data file. Simple but
- * may introduce a bit of contention when put under load.
+ * Optimized Store writer. Synchronously marshalls and writes to the data file.
+ * Simple but may introduce a bit of contention when put under load.
*
* @version $Revision: 1.1.1.1 $
*/
final public class SyncDataFileWriter {
-
+
private DataByteArrayOutputStream buffer;
private DataManagerImpl dataManager;
-
/**
* Construct a Store writer
*
* @param fileId
*/
- SyncDataFileWriter(DataManagerImpl fileManager){
- this.dataManager=fileManager;
- this.buffer=new DataByteArrayOutputStream();
+ SyncDataFileWriter(DataManagerImpl fileManager) {
+ this.dataManager = fileManager;
+ this.buffer = new DataByteArrayOutputStream();
}
- /* (non-Javadoc)
- * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
- */
- public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
-
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
+ * java.lang.Object, byte)
+ */
+ public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
+ throws IOException {
+
// Write the packet our internal buffer.
buffer.reset();
buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
- marshaller.writePayload(payload,buffer);
- int size=buffer.size();
- int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE;
+ marshaller.writePayload(payload, buffer);
+ int size = buffer.size();
+ int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
// Find the position where this item will land at.
- DataItem item=new DataItem();
+ DataItem item = new DataItem();
item.setSize(payloadSize);
- DataFile dataFile=dataManager.findSpaceForData(item);
-
+ DataFile dataFile = dataManager.findSpaceForData(item);
+
// Now splat the buffer to the file.
dataFile.getRandomAccessFile().seek(item.getOffset());
- dataFile.getRandomAccessFile().write(buffer.getData(),0,size);
+ dataFile.getRandomAccessFile().write(buffer.getData(), 0, size);
dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
-
+
dataManager.addInterestInFile(dataFile);
return item;
}
-
- /* (non-Javadoc)
- * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation, org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
- */
- public synchronized void updateItem(DataItem item,Marshaller marshaller, Object payload, byte type) throws IOException {
- //Write the packet our internal buffer.
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
+ * org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
+ */
+ public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type)
+ throws IOException {
+ // Write the packet our internal buffer.
buffer.reset();
buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
- marshaller.writePayload(payload,buffer);
- int size=buffer.size();
- int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE;
+ marshaller.writePayload(payload, buffer);
+ int size = buffer.size();
+ int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
item.setSize(payloadSize);
- DataFile dataFile = dataManager.getDataFile(item);
+ DataFile dataFile = dataManager.getDataFile(item);
RandomAccessFile file = dataFile.getRandomAccessFile();
file.seek(item.getOffset());
- file.write(buffer.getData(),0,size);
+ file.write(buffer.getData(), 0, size);
dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
}
- public synchronized void force(DataFile dataFile) throws IOException {
- // If our dirty marker was set.. then we need to sync
- if( dataFile.getWriterData()!=null && dataFile.isDirty()) {
- dataFile.getRandomAccessFile().getFD().sync();
- dataFile.setWriterData(null);
+ public synchronized void force(DataFile dataFile) throws IOException {
+ // If our dirty marker was set.. then we need to sync
+ if (dataFile.getWriterData() != null && dataFile.isDirty()) {
+ dataFile.getRandomAccessFile().getFD().sync();
+ dataFile.setWriterData(null);
dataFile.setDirty(false);
- }
- }
+ }
+ }
- public void close() throws IOException {
- }
+ public void close() throws IOException {
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/BadMagicException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/BadMagicException.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/BadMagicException.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/BadMagicException.java Wed Aug 8 11:56:59 2007
@@ -23,25 +23,26 @@
*
* @version $Revision: 1.2 $
*/
-public class BadMagicException extends IOException{
+public class BadMagicException extends IOException {
/**
*
*/
- private static final long serialVersionUID=-570930196733067056L;
+ private static final long serialVersionUID = -570930196733067056L;
/**
* Default Constructor
- *
+ *
*/
- public BadMagicException(){
+ public BadMagicException() {
super();
}
/**
* Construct an Exception with a reason
+ *
* @param s
*/
- public BadMagicException(String s){
+ public BadMagicException(String s) {
super(s);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java Wed Aug 8 11:56:59 2007
@@ -23,11 +23,12 @@
*
* @version $Revision: 1.2 $
*/
-public interface Index{
+public interface Index {
/**
- * clear the index
- * @throws IOException
+ * clear the index
+ *
+ * @throws IOException
*
*/
public void clear() throws IOException;
@@ -35,7 +36,7 @@
/**
* @param key
* @return true if it contains the key
- * @throws IOException
+ * @throws IOException
*/
public boolean containsKey(Object key) throws IOException;
@@ -44,7 +45,7 @@
*
* @param key
* @return StoreEntry removed
- * @throws IOException
+ * @throws IOException
*/
public StoreEntry remove(Object key) throws IOException;
@@ -53,15 +54,14 @@
*
* @param key
* @param entry
- * @throws IOException
+ * @throws IOException
*/
- public void store(Object key,StoreEntry entry) throws IOException;
-
-
+ public void store(Object key, StoreEntry entry) throws IOException;
+
/**
* @param key
* @return the entry
- * @throws IOException
+ * @throws IOException
*/
public StoreEntry get(Object key) throws IOException;
@@ -69,22 +69,22 @@
* @return true if the index is transient
*/
public boolean isTransient();
-
+
/**
* load indexes
*/
public void load();
-
+
/**
* unload indexes
- * @throws IOException
+ *
+ * @throws IOException
*/
public void unload() throws IOException;
-
-
-
+
/**
* Set the marshaller for key objects
+ *
* @param marshaller
*/
public void setKeyMarshaller(Marshaller marshaller);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java Wed Aug 8 11:56:59 2007
@@ -19,17 +19,17 @@
import org.apache.activemq.kaha.StoreEntry;
/**
-* Inteface to LinkedList of Indexes
-*
-* @version $Revision$
-*/
-public interface IndexLinkedList{
-
+ * Inteface to LinkedList of Indexes
+ *
+ * @version $Revision$
+ */
+public interface IndexLinkedList {
+
/**
* @return the root used by the List
*/
public IndexItem getRoot();
-
+
/**
* Returns the first element in this list.
*
@@ -60,14 +60,16 @@
/**
* Inserts the given element at the beginning of this list.
- * @param item
+ *
+ * @param item
*/
public void addFirst(IndexItem item);
/**
- * Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
- * only for consistency.)
- * @param item
+ * Appends the given element to the end of this list. (Identical in function
+ * to the <tt>add</tt> method; included only for consistency.)
+ *
+ * @param item
*/
public void addLast(IndexItem item);
@@ -87,9 +89,11 @@
/**
* Appends the specified element to the end of this list.
- * @param item
*
- * @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
+ * @param item
+ *
+ * @return <tt>true</tt> (as per the general contract of
+ * <tt>Collection.add</tt>).
*/
public boolean add(IndexItem item);
@@ -105,24 +109,27 @@
* @param index index of element to return.
* @return the element at the specified position in this list.
*
- * @throws IndexOutOfBoundsException if the specified index is is out of range (<tt>index < 0 || index >= size()</tt>).
+ * @throws IndexOutOfBoundsException if the specified index is is out of
+ * range (<tt>index < 0 || index >= size()</tt>).
*/
public IndexItem get(int index);
/**
- * Inserts the specified element at the specified position in this list. Shifts the element currently at that
- * position (if any) and any subsequent elements to the right (adds one to their indices).
+ * Inserts the specified element at the specified position in this list.
+ * Shifts the element currently at that position (if any) and any subsequent
+ * elements to the right (adds one to their indices).
*
* @param index index at which the specified element is to be inserted.
* @param element element to be inserted.
*
* @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index < 0 || index > size()</tt>).
*/
- public void add(int index,IndexItem element);
+ public void add(int index, IndexItem element);
/**
- * Removes the element at the specified position in this list. Shifts any subsequent elements to the left (subtracts
- * one from their indices). Returns the element that was removed from the list.
+ * Removes the element at the specified position in this list. Shifts any
+ * subsequent elements to the left (subtracts one from their indices).
+ * Returns the element that was removed from the list.
*
* @param index the index of the element to removed.
* @return the element previously at the specified position.
@@ -133,13 +140,15 @@
// Search Operations
/**
- * Returns the index in this list of the first occurrence of the specified element, or -1 if the List does not
- * contain this element. More formally, returns the lowest index i such that
- * <tt>(o==null ? get(i)==null : o.equals(get(i)))</tt>, or -1 if there is no such index.
+ * Returns the index in this list of the first occurrence of the specified
+ * element, or -1 if the List does not contain this element. More formally,
+ * returns the lowest index i such that
+ * <tt>(o==null ? get(i)==null : o.equals(get(i)))</tt>, or -1 if there
+ * is no such index.
*
* @param o element to search for.
- * @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
- * contain this element.
+ * @return the index in this list of the first occurrence of the specified
+ * element, or -1 if the list does not contain this element.
*/
public int indexOf(StoreEntry o);
@@ -159,22 +168,24 @@
*/
public IndexItem getPrevEntry(IndexItem entry);
-
/**
* remove an entry
+ *
* @param e
*/
public void remove(IndexItem e);
-
+
/**
* Ensure we have the up to date entry
- * @param entry
+ *
+ * @param entry
* @return the entry
*/
- public StoreEntry getEntry(StoreEntry entry);
-
+ public StoreEntry getEntry(StoreEntry entry);
+
/**
* Update the indexes of a StoreEntry
+ *
* @param current
* @return update StoreEntry
*/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/RedoStoreIndexItem.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/RedoStoreIndexItem.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/RedoStoreIndexItem.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/RedoStoreIndexItem.java Wed Aug 8 11:56:59 2007
@@ -35,11 +35,11 @@
}
public void writePayload(Object object, DataOutput out) throws IOException {
- RedoStoreIndexItem item = (RedoStoreIndexItem) object;
+ RedoStoreIndexItem item = (RedoStoreIndexItem)object;
item.writeExternal(out);
}
- };
-
+ };
+
private static final long serialVersionUID = -4865508871719676655L;
private String indexName;
private IndexItem indexItem;
@@ -47,47 +47,54 @@
public RedoStoreIndexItem() {
}
+
public RedoStoreIndexItem(String indexName, long offset, IndexItem item) {
this.indexName = indexName;
- this.offset=offset;
+ this.offset = offset;
this.indexItem = item;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
readExternal((DataInput)in);
}
+
public void readExternal(DataInput in) throws IOException {
// indexName = in.readUTF();
offset = in.readLong();
indexItem = new IndexItem();
indexItem.read(in);
}
-
+
public void writeExternal(ObjectOutput out) throws IOException {
writeExternal((DataOutput)out);
}
+
public void writeExternal(DataOutput out) throws IOException {
// out.writeUTF(indexName);
out.writeLong(offset);
indexItem.write(out);
}
-
+
public String getIndexName() {
return indexName;
}
+
public void setIndexName(String indexName) {
this.indexName = indexName;
}
-
+
public IndexItem getIndexItem() {
return indexItem;
}
+
public void setIndexItem(IndexItem item) {
this.indexItem = item;
}
+
public long getOffset() {
return offset;
}
+
public void setOffset(long offset) {
this.offset = offset;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java Wed Aug 8 11:56:59 2007
@@ -20,40 +20,41 @@
import java.io.RandomAccessFile;
import org.apache.activemq.util.DataByteArrayInputStream;
+
/**
* Optimized Store reader
*
* @version $Revision: 1.1.1.1 $
*/
-class StoreIndexReader{
+class StoreIndexReader {
protected RandomAccessFile file;
protected DataByteArrayInputStream dataIn;
- protected byte[] buffer=new byte[IndexItem.INDEX_SIZE];
+ protected byte[] buffer = new byte[IndexItem.INDEX_SIZE];
/**
* Construct a Store reader
*
* @param file
*/
- StoreIndexReader(RandomAccessFile file){
- this.file=file;
- this.dataIn=new DataByteArrayInputStream();
+ StoreIndexReader(RandomAccessFile file) {
+ this.file = file;
+ this.dataIn = new DataByteArrayInputStream();
}
- protected IndexItem readItem(long offset) throws IOException{
+ protected IndexItem readItem(long offset) throws IOException {
file.seek(offset);
file.readFully(buffer);
dataIn.restart(buffer);
- IndexItem result=new IndexItem();
+ IndexItem result = new IndexItem();
result.setOffset(offset);
result.read(dataIn);
return result;
}
-
- void updateIndexes(IndexItem indexItem) throws IOException{
- if (indexItem != null){
+
+ void updateIndexes(IndexItem indexItem) throws IOException {
+ if (indexItem != null) {
file.seek(indexItem.getOffset());
- file.readFully(buffer,0,IndexItem.INDEXES_ONLY_SIZE);
+ file.readFully(buffer, 0, IndexItem.INDEXES_ONLY_SIZE);
dataIn.restart(buffer);
indexItem.readIndexes(dataIn);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java Wed Aug 8 11:56:59 2007
@@ -21,13 +21,14 @@
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.util.DataByteArrayOutputStream;
+
/**
* Optimized Store writer
*
* @version $Revision: 1.1.1.1 $
*/
-class StoreIndexWriter{
-
+class StoreIndexWriter {
+
protected final DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream();
protected final RandomAccessFile file;
protected final String name;
@@ -38,46 +39,46 @@
*
* @param file
*/
- StoreIndexWriter(RandomAccessFile file){
+ StoreIndexWriter(RandomAccessFile file) {
this(file, null, null);
}
public StoreIndexWriter(RandomAccessFile file, String indexName, DataManager redoLog) {
- this.file=file;
+ this.file = file;
this.name = indexName;
this.redoLog = redoLog;
}
- void storeItem(IndexItem indexItem) throws IOException{
-
- if( redoLog!=null ) {
+ void storeItem(IndexItem indexItem) throws IOException {
+
+ if (redoLog != null) {
RedoStoreIndexItem redo = new RedoStoreIndexItem(name, indexItem.getOffset(), indexItem);
redoLog.storeRedoItem(redo);
}
-
+
dataOut.reset();
indexItem.write(dataOut);
file.seek(indexItem.getOffset());
- file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
+ file.write(dataOut.getData(), 0, IndexItem.INDEX_SIZE);
}
-
- void updateIndexes(IndexItem indexItem) throws IOException{
- if( redoLog!=null ) {
+
+ void updateIndexes(IndexItem indexItem) throws IOException {
+ if (redoLog != null) {
RedoStoreIndexItem redo = new RedoStoreIndexItem(name, indexItem.getOffset(), indexItem);
redoLog.storeRedoItem(redo);
}
-
+
dataOut.reset();
indexItem.updateIndexes(dataOut);
file.seek(indexItem.getOffset());
- file.write(dataOut.getData(),0,IndexItem.INDEXES_ONLY_SIZE);
+ file.write(dataOut.getData(), 0, IndexItem.INDEXES_ONLY_SIZE);
}
public void redoStoreItem(RedoStoreIndexItem redo) throws IOException {
dataOut.reset();
redo.getIndexItem().write(dataOut);
file.seek(redo.getOffset());
- file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
+ file.write(dataOut.getData(), 0, IndexItem.INDEX_SIZE);
}
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java Wed Aug 8 11:56:59 2007
@@ -28,19 +28,20 @@
*
* @version $Revision: 1.2 $
*/
-public class VMIndex implements Index{
- private static final Log log=LogFactory.getLog(VMIndex.class);
+public class VMIndex implements Index {
+ private static final Log log = LogFactory.getLog(VMIndex.class);
private IndexManager indexManager;
- private Map<Object,StoreEntry> map=new HashMap<Object,StoreEntry>();
+ private Map<Object, StoreEntry> map = new HashMap<Object, StoreEntry>();
public VMIndex(IndexManager manager) {
- this.indexManager= manager;
+ this.indexManager = manager;
}
+
/**
*
* @see org.apache.activemq.kaha.impl.index.Index#clear()
*/
- public void clear(){
+ public void clear() {
map.clear();
}
@@ -49,7 +50,7 @@
* @return true if the index contains the key
* @see org.apache.activemq.kaha.impl.index.Index#containsKey(java.lang.Object)
*/
- public boolean containsKey(Object key){
+ public boolean containsKey(Object key) {
return map.containsKey(key);
}
@@ -58,14 +59,14 @@
* @return store entry
* @see org.apache.activemq.kaha.impl.index.Index#removeKey(java.lang.Object)
*/
- public StoreEntry remove(Object key){
- StoreEntry result = map.remove(key);
+ public StoreEntry remove(Object key) {
+ StoreEntry result = map.remove(key);
if (result != null) {
- try{
- result=indexManager.refreshIndex((IndexItem)result);
- }catch(IOException e){
- log.error("Failed to refresh entry",e);
- throw new RuntimeException("Failed to refresh entry");
+ try {
+ result = indexManager.refreshIndex((IndexItem)result);
+ } catch (IOException e) {
+ log.error("Failed to refresh entry", e);
+ throw new RuntimeException("Failed to refresh entry");
}
}
return result;
@@ -77,22 +78,22 @@
* @see org.apache.activemq.kaha.impl.index.Index#store(java.lang.Object,
* org.apache.activemq.kaha.impl.index.IndexItem)
*/
- public void store(Object key,StoreEntry entry){
- map.put(key,entry);
+ public void store(Object key, StoreEntry entry) {
+ map.put(key, entry);
}
/**
* @param key
* @return the entry
*/
- public StoreEntry get(Object key){
- StoreEntry result = map.get(key);
+ public StoreEntry get(Object key) {
+ StoreEntry result = map.get(key);
if (result != null) {
- try{
- result=indexManager.refreshIndex((IndexItem)result);
- }catch(IOException e){
- log.error("Failed to refresh entry",e);
- throw new RuntimeException("Failed to refresh entry");
+ try {
+ result = indexManager.refreshIndex((IndexItem)result);
+ } catch (IOException e) {
+ log.error("Failed to refresh entry", e);
+ throw new RuntimeException("Failed to refresh entry");
}
}
return result;
@@ -101,24 +102,23 @@
/**
* @return true if the index is transient
*/
- public boolean isTransient(){
+ public boolean isTransient() {
return true;
}
/**
* load indexes
*/
- public void load(){
+ public void load() {
}
/**
* unload indexes
*/
- public void unload(){
+ public void unload() {
map.clear();
}
-
-
- public void setKeyMarshaller(Marshaller marshaller){
+
+ public void setKeyMarshaller(Marshaller marshaller) {
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java Wed Aug 8 11:56:59 2007
@@ -24,75 +24,75 @@
*
* @version $Revision: 1.1.1.1 $
*/
-class HashEntry implements Comparable{
+class HashEntry implements Comparable {
- static final int NOT_SET=-1;
+ static final int NOT_SET = -1;
private Comparable key;
private long indexOffset;
- public int compareTo(Object o){
- if(o instanceof HashEntry){
- HashEntry other=(HashEntry)o;
+ public int compareTo(Object o) {
+ if (o instanceof HashEntry) {
+ HashEntry other = (HashEntry)o;
return key.compareTo(other.key);
- }else{
+ } else {
return key.compareTo(o);
}
}
- public boolean equals(Object o){
- return compareTo(o)==0;
+ public boolean equals(Object o) {
+ return compareTo(o) == 0;
}
- public int hashCode(){
+ public int hashCode() {
return key.hashCode();
}
- public String toString(){
- return "HashEntry("+key+","+indexOffset+")";
+ public String toString() {
+ return "HashEntry(" + key + "," + indexOffset + ")";
}
- HashEntry copy(){
- HashEntry copy=new HashEntry();
- copy.key=this.key;
- copy.indexOffset=this.indexOffset;
+ HashEntry copy() {
+ HashEntry copy = new HashEntry();
+ copy.key = this.key;
+ copy.indexOffset = this.indexOffset;
return copy;
}
/**
* @return the key
*/
- Comparable getKey(){
+ Comparable getKey() {
return this.key;
}
/**
* @param key the key to set
*/
- void setKey(Comparable key){
- this.key=key;
+ void setKey(Comparable key) {
+ this.key = key;
}
/**
* @return the indexOffset
*/
- long getIndexOffset(){
+ long getIndexOffset() {
return this.indexOffset;
}
/**
* @param indexOffset the indexOffset to set
*/
- void setIndexOffset(long indexOffset){
- this.indexOffset=indexOffset;
+ void setIndexOffset(long indexOffset) {
+ this.indexOffset = indexOffset;
}
- void write(Marshaller keyMarshaller,DataOutput dataOut) throws IOException{
+ void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
dataOut.writeLong(indexOffset);
- keyMarshaller.writePayload(key,dataOut);
+ keyMarshaller.writePayload(key, dataOut);
}
- void read(Marshaller keyMarshaller,DataInput dataIn) throws IOException{
- indexOffset=dataIn.readLong();
- key=(Comparable)keyMarshaller.readPayload(dataIn);
+ void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
+ indexOffset = dataIn.readLong();
+ key = (Comparable)keyMarshaller.readPayload(dataIn);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java Wed Aug 8 11:56:59 2007
@@ -24,123 +24,120 @@
*
* @version $Revision: 1.1.1.1 $
*/
-class TreeEntry implements Comparable{
+class TreeEntry implements Comparable {
- static final int NOT_SET=-1;
+ static final int NOT_SET = -1;
private Comparable key;
private long indexOffset;
- private long prevPageId=NOT_SET;
- private long nextPageId=NOT_SET;
+ private long prevPageId = NOT_SET;
+ private long nextPageId = NOT_SET;
- public int compareTo(Object o){
- if(o instanceof TreeEntry){
- TreeEntry other=(TreeEntry)o;
+ public int compareTo(Object o) {
+ if (o instanceof TreeEntry) {
+ TreeEntry other = (TreeEntry)o;
return key.compareTo(other.key);
- }else{
+ } else {
return key.compareTo(o);
}
}
- public boolean equals(Object o){
- return compareTo(o)==0;
+ public boolean equals(Object o) {
+ return compareTo(o) == 0;
}
- public int hashCode(){
+ public int hashCode() {
return key.hashCode();
}
- public String toString(){
- return "TreeEntry("+key+","+indexOffset+")prev="+prevPageId+",next="+nextPageId;
+ public String toString() {
+ return "TreeEntry(" + key + "," + indexOffset + ")prev=" + prevPageId + ",next=" + nextPageId;
}
- void reset(){
- prevPageId=nextPageId=NOT_SET;
+ void reset() {
+ prevPageId = nextPageId = NOT_SET;
}
- TreeEntry copy(){
- TreeEntry copy=new TreeEntry();
- copy.key=this.key;
- copy.indexOffset=this.indexOffset;
- copy.prevPageId=this.prevPageId;
- copy.nextPageId=this.nextPageId;
+ TreeEntry copy() {
+ TreeEntry copy = new TreeEntry();
+ copy.key = this.key;
+ copy.indexOffset = this.indexOffset;
+ copy.prevPageId = this.prevPageId;
+ copy.nextPageId = this.nextPageId;
return copy;
}
/**
* @return the key
*/
- Comparable getKey(){
+ Comparable getKey() {
return this.key;
}
/**
* @param key the key to set
*/
- void setKey(Comparable key){
- this.key=key;
+ void setKey(Comparable key) {
+ this.key = key;
}
/**
* @return the nextPageId
*/
- long getNextPageId(){
+ long getNextPageId() {
return this.nextPageId;
}
/**
* @param nextPageId the nextPageId to set
*/
- void setNextPageId(long nextPageId){
- this.nextPageId=nextPageId;
+ void setNextPageId(long nextPageId) {
+ this.nextPageId = nextPageId;
}
/**
* @return the prevPageId
*/
- long getPrevPageId(){
+ long getPrevPageId() {
return this.prevPageId;
}
/**
* @param prevPageId the prevPageId to set
*/
- void setPrevPageId(long prevPageId){
- this.prevPageId=prevPageId;
+ void setPrevPageId(long prevPageId) {
+ this.prevPageId = prevPageId;
}
-
+
/**
* @return the indexOffset
*/
- long getIndexOffset(){
+ long getIndexOffset() {
return this.indexOffset;
}
-
/**
* @param indexOffset the indexOffset to set
*/
- void setIndexOffset(long indexOffset){
- this.indexOffset=indexOffset;
+ void setIndexOffset(long indexOffset) {
+ this.indexOffset = indexOffset;
}
- boolean hasChildPagesReferences(){
- return prevPageId!=NOT_SET||nextPageId!=NOT_SET;
+ boolean hasChildPagesReferences() {
+ return prevPageId != NOT_SET || nextPageId != NOT_SET;
}
- void write(Marshaller keyMarshaller,DataOutput dataOut) throws IOException{
- keyMarshaller.writePayload(key,dataOut);
+ void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
+ keyMarshaller.writePayload(key, dataOut);
dataOut.writeLong(indexOffset);
dataOut.writeLong(nextPageId);
dataOut.writeLong(prevPageId);
}
- void read(Marshaller keyMarshaller,DataInput dataIn) throws IOException{
- key=(Comparable)keyMarshaller.readPayload(dataIn);
- indexOffset=dataIn.readLong();
- nextPageId=dataIn.readLong();
- prevPageId=dataIn.readLong();
+ void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
+ key = (Comparable)keyMarshaller.readPayload(dataIn);
+ indexOffset = dataIn.readLong();
+ nextPageId = dataIn.readLong();
+ prevPageId = dataIn.readLong();
}
-
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreePageEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreePageEntry.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreePageEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreePageEntry.java Wed Aug 8 11:56:59 2007
@@ -15,81 +15,77 @@
package org.apache.activemq.kaha.impl.index.tree;
/**
-* A conglomarate used for return results from a tree lookup
-*
-* @version $Revision: 1.1.1.1 $
-*/
-class TreePageEntry{
+ * A conglomarate used for return results from a tree lookup
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class TreePageEntry {
private TreeEntry treeEntry;
private TreePage treePage;
private TreePage.Flavour flavour;
private int index = -1;
- TreePageEntry(TreeEntry treeEntry,TreePage treePage,TreePage.Flavour flavour, int index){
+ TreePageEntry(TreeEntry treeEntry, TreePage treePage, TreePage.Flavour flavour, int index) {
this.treeEntry = treeEntry;
- this.treePage=treePage;
- this.flavour=flavour;
+ this.treePage = treePage;
+ this.flavour = flavour;
this.index = index;
}
/**
* @return the flavour
*/
- TreePage.Flavour getFlavour(){
+ TreePage.Flavour getFlavour() {
return this.flavour;
}
/**
* @param flavour the flavour to set
*/
- void setFlavour(TreePage.Flavour flavour){
- this.flavour=flavour;
+ void setFlavour(TreePage.Flavour flavour) {
+ this.flavour = flavour;
}
/**
* @return the treePage
*/
- TreePage getTreePage(){
+ TreePage getTreePage() {
return this.treePage;
}
/**
* @param treePage the treePage to set
*/
- void setTreePage(TreePage treePage){
- this.treePage=treePage;
+ void setTreePage(TreePage treePage) {
+ this.treePage = treePage;
}
-
/**
* @return the index
*/
- public int getIndex(){
+ public int getIndex() {
return this.index;
}
-
/**
* @param index the index to set
*/
- public void setIndex(int index){
- this.index=index;
+ public void setIndex(int index) {
+ this.index = index;
}
-
/**
* @return the treeEntry
*/
- public TreeEntry getTreeEntry(){
+ public TreeEntry getTreeEntry() {
return this.treeEntry;
}
-
/**
* @param treeEntry the treeEntry to set
*/
- public void setTreeEntry(TreeEntry treeEntry){
- this.treeEntry=treeEntry;
+ public void setTreeEntry(TreeEntry treeEntry) {
+ this.treeEntry = treeEntry;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/CountStatisticImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/CountStatisticImpl.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/CountStatisticImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/CountStatisticImpl.java Wed Aug 8 11:56:59 2007
@@ -22,7 +22,7 @@
/**
* A count statistic implementation
- *
+ *
* @version $Revision: 1.3 $
*/
public class CountStatisticImpl extends StatisticImpl implements CountStatistic {
@@ -53,50 +53,50 @@
}
public void setCount(long count) {
- if(isEnabled()) {
+ if (isEnabled()) {
counter.set(count);
- }
+ }
}
public void add(long amount) {
- if (isEnabled()) {
- counter.addAndGet(amount);
- updateSampleTime();
- if (parent != null) {
- parent.add(amount);
- }
- }
+ if (isEnabled()) {
+ counter.addAndGet(amount);
+ updateSampleTime();
+ if (parent != null) {
+ parent.add(amount);
+ }
+ }
}
public void increment() {
- if (isEnabled()) {
- counter.incrementAndGet();
- updateSampleTime();
- if (parent != null) {
- parent.increment();
- }
- }
- }
-
- public void subtract(long amount) {
- if (isEnabled()) {
- counter.addAndGet(-amount);
- updateSampleTime();
- if (parent != null) {
- parent.subtract(amount);
- }
- }
- }
-
- public void decrement() {
- if (isEnabled()) {
- counter.decrementAndGet();
- updateSampleTime();
- if (parent != null) {
- parent.decrement();
- }
- }
- }
+ if (isEnabled()) {
+ counter.incrementAndGet();
+ updateSampleTime();
+ if (parent != null) {
+ parent.increment();
+ }
+ }
+ }
+
+ public void subtract(long amount) {
+ if (isEnabled()) {
+ counter.addAndGet(-amount);
+ updateSampleTime();
+ if (parent != null) {
+ parent.subtract(amount);
+ }
+ }
+ }
+
+ public void decrement() {
+ if (isEnabled()) {
+ counter.decrementAndGet();
+ updateSampleTime();
+ if (parent != null) {
+ parent.decrement();
+ }
+ }
+ }
public CountStatisticImpl getParent() {
return parent;
@@ -111,25 +111,27 @@
buffer.append(Long.toString(counter.get()));
super.appendFieldDescription(buffer);
}
-
+
/**
- * @return the average time period that elapses between counter increments since the last reset.
+ * @return the average time period that elapses between counter increments
+ * since the last reset.
*/
public double getPeriod() {
double count = counter.get();
- if( count == 0 )
+ if (count == 0)
return 0;
double time = (System.currentTimeMillis() - getStartTime());
- return (time/(count*1000.0));
+ return (time / (count * 1000.0));
}
-
+
/**
- * @return the number of times per second that the counter is incrementing since the last reset.
+ * @return the number of times per second that the counter is incrementing
+ * since the last reset.
*/
public double getFrequency() {
double count = counter.get();
double time = (System.currentTimeMillis() - getStartTime());
- return (count*1000.0/time);
+ return (count * 1000.0 / time);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/PollCountStatisticImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/PollCountStatisticImpl.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/PollCountStatisticImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/PollCountStatisticImpl.java Wed Aug 8 11:56:59 2007
@@ -23,7 +23,7 @@
/**
* A count statistic implementation
- *
+ *
* @version $Revision$
*/
public class PollCountStatisticImpl extends StatisticImpl implements CountStatistic {
@@ -49,61 +49,63 @@
}
public void setParent(PollCountStatisticImpl parent) {
- if( this.parent !=null ) {
+ if (this.parent != null) {
this.parent.removeChild(this);
}
this.parent = parent;
- if( this.parent !=null ) {
+ if (this.parent != null) {
this.parent.addChild(this);
}
}
synchronized private void removeChild(PollCountStatisticImpl child) {
- if( children!=null )
+ if (children != null)
children.remove(child);
}
synchronized private void addChild(PollCountStatisticImpl child) {
- if( children==null )
+ if (children == null)
children = new ArrayList();
children.add(child);
}
synchronized public long getCount() {
- if ( children == null )
+ if (children == null)
return 0;
- long count=0;
+ long count = 0;
for (Iterator iter = children.iterator(); iter.hasNext();) {
- PollCountStatisticImpl child = (PollCountStatisticImpl) iter.next();
+ PollCountStatisticImpl child = (PollCountStatisticImpl)iter.next();
count += child.getCount();
}
return count;
}
-
+
protected void appendFieldDescription(StringBuffer buffer) {
buffer.append(" count: ");
buffer.append(Long.toString(getCount()));
super.appendFieldDescription(buffer);
}
-
+
/**
- * @return the average time period that elapses between counter increments since the last reset.
+ * @return the average time period that elapses between counter increments
+ * since the last reset.
*/
public double getPeriod() {
double count = getCount();
- if( count == 0 )
+ if (count == 0)
return 0;
double time = (System.currentTimeMillis() - getStartTime());
- return (time/(count*1000.0));
+ return (time / (count * 1000.0));
}
-
+
/**
- * @return the number of times per second that the counter is incrementing since the last reset.
+ * @return the number of times per second that the counter is incrementing
+ * since the last reset.
*/
public double getFrequency() {
double count = getCount();
double time = (System.currentTimeMillis() - getStartTime());
- return (count*1000.0/time);
+ return (count * 1000.0 / time);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java Wed Aug 8 11:56:59 2007
@@ -16,48 +16,49 @@
*/
package org.apache.activemq.memory;
-
/**
* Defines the interface used to cache messages.
- *
+ *
* @version $Revision$
*/
public interface Cache {
-
- /**
- * Gets an object that was previously <code>put</code> into this object.
- *
- * @param msgid
- * @return null if the object was not previously put or if the object has expired out of the cache.
- */
- public Object get(Object key);
-
- /**
- * Puts an object into the cache.
- *
- * @param messageID
- * @param message
- */
- public Object put(Object key, Object value);
-
- /**
- * Removes an object from the cache.
- *
- * @param messageID
+
+ /**
+ * Gets an object that was previously <code>put</code> into this object.
+ *
+ * @param msgid
+ * @return null if the object was not previously put or if the object has
+ * expired out of the cache.
+ */
+ public Object get(Object key);
+
+ /**
+ * Puts an object into the cache.
+ *
+ * @param messageID
+ * @param message
+ */
+ public Object put(Object key, Object value);
+
+ /**
+ * Removes an object from the cache.
+ *
+ * @param messageID
* @return the object associated with the key if it was still in the cache.
- */
- public Object remove(Object key);
+ */
+ public Object remove(Object key);
/**
- * Lets a cache know it will not be used any further and that it can release
+ * Lets a cache know it will not be used any further and that it can release
* acquired resources
*/
public void close();
/**
* How big is the cache right now?
+ *
* @return
*/
public int size();
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java Wed Aug 8 11:56:59 2007
@@ -17,16 +17,16 @@
package org.apache.activemq.memory;
public class CacheEntry {
-
- public final Object key;
+
+ public final Object key;
public final Object value;
-
+
public CacheEntry next;
public CacheEntry previous;
public CacheEntryList owner;
-
+
public CacheEntry(Object key, Object value) {
- this.key=key;
+ this.key = key;
this.value = value;
}
@@ -36,19 +36,19 @@
* @return false if you are trying to remove the tail pointer.
*/
public boolean remove() {
-
+
// Cannot remove if this is a tail pointer.
// Or not linked.
- if( owner==null || this.key==null || this.next==null )
+ if (owner == null || this.key == null || this.next == null)
return false;
-
- synchronized( owner.tail ) {
+
+ synchronized (owner.tail) {
this.next.previous = this.previous;
this.previous.next = this.next;
this.owner = null;
this.next = this.previous = null;
}
-
+
return true;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java Wed Aug 8 11:56:59 2007
@@ -17,37 +17,37 @@
package org.apache.activemq.memory;
/**
- * Maintains a simple linked list of CacheEntry objects. It is thread safe.
+ * Maintains a simple linked list of CacheEntry objects. It is thread safe.
*
* @version $Revision$
*/
public class CacheEntryList {
-
+
// Points at the tail of the CacheEntry list
public final CacheEntry tail = new CacheEntry(null, null);
-
+
public CacheEntryList() {
tail.next = tail.previous = tail;
}
-
+
public void add(CacheEntry ce) {
addEntryBefore(tail, ce);
}
-
+
private void addEntryBefore(CacheEntry position, CacheEntry ce) {
- assert ce.key!=null && ce.next==null && ce.owner==null;
-
- synchronized( tail ) {
- ce.owner=this;
+ assert ce.key != null && ce.next == null && ce.owner == null;
+
+ synchronized (tail) {
+ ce.owner = this;
ce.next = position;
- ce.previous = position.previous;
+ ce.previous = position.previous;
ce.previous.next = ce;
ce.next.previous = ce;
}
}
-
+
public void clear() {
- synchronized( tail ) {
+ synchronized (tail) {
tail.next = tail.previous = tail;
}
}
@@ -56,19 +56,19 @@
return new CacheEvictor() {
public CacheEntry evictCacheEntry() {
CacheEntry rc;
- synchronized( tail ) {
+ synchronized (tail) {
rc = tail.next;
}
return rc.remove() ? rc : null;
}
};
}
-
+
public CacheEvictor createLIFOCacheEvictor() {
return new CacheEvictor() {
public CacheEntry evictCacheEntry() {
CacheEntry rc;
- synchronized( tail ) {
+ synchronized (tail) {
rc = tail.previous;
}
return rc.remove() ? rc : null;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java Wed Aug 8 11:56:59 2007
@@ -28,39 +28,40 @@
import java.util.concurrent.CopyOnWriteArrayList;
public class CacheEvictionUsageListener implements UsageListener {
-
+
private final static Log log = LogFactory.getLog(CacheEvictionUsageListener.class);
-
+
private final CopyOnWriteArrayList evictors = new CopyOnWriteArrayList();
private final int usageHighMark;
private final int usageLowMark;
private final TaskRunner evictionTask;
private final UsageManager usageManager;
-
- public CacheEvictionUsageListener(UsageManager usageManager, int usageHighMark, int usageLowMark, TaskRunnerFactory taskRunnerFactory) {
+
+ public CacheEvictionUsageListener(UsageManager usageManager, int usageHighMark, int usageLowMark,
+ TaskRunnerFactory taskRunnerFactory) {
this.usageManager = usageManager;
this.usageHighMark = usageHighMark;
this.usageLowMark = usageLowMark;
- evictionTask = taskRunnerFactory.createTaskRunner(new Task(){
+ evictionTask = taskRunnerFactory.createTaskRunner(new Task() {
public boolean iterate() {
return evictMessages();
}
- }, "Cache Evictor: "+System.identityHashCode(this));
+ }, "Cache Evictor: " + System.identityHashCode(this));
}
-
+
boolean evictMessages() {
// Try to take the memory usage down below the low mark.
- try {
- log.debug("Evicting cache memory usage: "+usageManager.getPercentUsage());
-
+ try {
+ log.debug("Evicting cache memory usage: " + usageManager.getPercentUsage());
+
LinkedList list = new LinkedList(evictors);
- while (list.size()>0 && usageManager.getPercentUsage() > usageLowMark) {
-
+ while (list.size() > 0 && usageManager.getPercentUsage() > usageLowMark) {
+
// Evenly evict messages from all evictors
for (Iterator iter = list.iterator(); iter.hasNext();) {
- CacheEvictor evictor = (CacheEvictor) iter.next();
- if( evictor.evictCacheEntry() == null )
+ CacheEvictor evictor = (CacheEvictor)iter.next();
+ if (evictor.evictCacheEntry() == null)
iter.remove();
}
}
@@ -68,11 +69,11 @@
}
return false;
}
-
+
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
// Do we need to start evicting cache entries? Usage > than the
// high mark
- if (oldPercentUsage < newPercentUsage && memoryManager.getPercentUsage() >= usageHighMark) {
+ if (oldPercentUsage < newPercentUsage && memoryManager.getPercentUsage() >= usageHighMark) {
try {
evictionTask.wakeup();
} catch (InterruptedException e) {
@@ -80,11 +81,11 @@
}
}
}
-
+
public void add(CacheEvictor evictor) {
evictors.add(evictor);
}
-
+
public void remove(CacheEvictor evictor) {
evictors.remove(evictor);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java Wed Aug 8 11:56:59 2007
@@ -26,10 +26,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
/**
- * Used to keep track of how much of something is being used so that
- * a productive working set usage can be controlled.
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled.
*
* Main use case is manage memory usage.
*
@@ -37,24 +36,27 @@
*
* @version $Revision: 1.3 $
*/
-public class UsageManager implements Service{
+public class UsageManager implements Service {
private static final Log log = LogFactory.getLog(UsageManager.class);
-
+
private final UsageManager parent;
private long limit;
private long usage;
-
+
private int percentUsage;
- private int percentUsageMinDelta=1;
-
+ private int percentUsageMinDelta = 1;
+
private final Object usageMutex = new Object();
-
+
private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
-
+
private boolean sendFailIfNoSpace;
- /** True if someone called setSendFailIfNoSpace() on this particular usage manager */
+ /**
+ * True if someone called setSendFailIfNoSpace() on this particular usage
+ * manager
+ */
private boolean sendFailIfNoSpaceExplicitySet;
private final boolean debug = log.isDebugEnabled();
private String name = "";
@@ -63,45 +65,46 @@
private final LinkedList<Runnable> callbacks = new LinkedList<Runnable>();
public UsageManager() {
- this(null,"default");
+ this(null, "default");
}
-
+
/**
- * Create the memory manager linked to a parent. When the memory manager is linked to
- * a parent then when usage increased or decreased, the parent's usage is also increased
- * or decreased.
+ * Create the memory manager linked to a parent. When the memory manager is
+ * linked to a parent then when usage increased or decreased, the parent's
+ * usage is also increased or decreased.
*
* @param parent
*/
public UsageManager(UsageManager parent) {
- this(parent,"default");
+ this(parent, "default");
}
-
+
public UsageManager(String name) {
- this(null,name);
+ this(null, name);
}
-
- public UsageManager(UsageManager parent,String name) {
- this(parent,name,1.0f);
+
+ public UsageManager(UsageManager parent, String name) {
+ this(parent, name, 1.0f);
}
-
+
public UsageManager(UsageManager parent, String name, float portion) {
this.parent = parent;
- this.usagePortion=portion;
+ this.usagePortion = portion;
if (parent != null) {
- this.limit=(long)(parent.limit * portion);
- this.name= parent.name + ":";
+ this.limit = (long)(parent.limit * portion);
+ this.name = parent.name + ":";
}
this.name += name;
}
-
+
/**
- * Tries to increase the usage by value amount but blocks if this object
- * is currently full.
- * @throws InterruptedException
+ * Tries to increase the usage by value amount but blocks if this object is
+ * currently full.
+ *
+ * @throws InterruptedException
*/
public void enqueueUsage(long value) throws InterruptedException {
- waitForSpace();
+ waitForSpace();
increaseUsage(value);
}
@@ -109,10 +112,10 @@
* @throws InterruptedException
*/
public void waitForSpace() throws InterruptedException {
- if(parent!=null)
+ if (parent != null)
parent.waitForSpace();
synchronized (usageMutex) {
- for( int i=0; percentUsage >= 100 ; i++) {
+ for (int i = 0; percentUsage >= 100; i++) {
usageMutex.wait();
}
}
@@ -122,71 +125,72 @@
* @throws InterruptedException
*
* @param timeout
- */
+ */
public boolean waitForSpace(long timeout) throws InterruptedException {
- if(parent!=null) {
- if( !parent.waitForSpace(timeout) )
- return false;
+ if (parent != null) {
+ if (!parent.waitForSpace(timeout))
+ return false;
}
synchronized (usageMutex) {
- if( percentUsage >= 100 ) {
+ if (percentUsage >= 100) {
usageMutex.wait(timeout);
}
return percentUsage < 100;
}
- }
-
+ }
+
/**
- * Increases the usage by the value amount.
+ * Increases the usage by the value amount.
*
* @param value
*/
public void increaseUsage(long value) {
- if( value == 0 )
+ if (value == 0)
return;
- if(parent!=null)
+ if (parent != null)
parent.increaseUsage(value);
int percentUsage;
- synchronized(usageMutex) {
- usage+=value;
- percentUsage = caclPercentUsage();
+ synchronized (usageMutex) {
+ usage += value;
+ percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
}
-
+
/**
- * Decreases the usage by the value amount.
+ * Decreases the usage by the value amount.
*
* @param value
*/
public void decreaseUsage(long value) {
- if( value == 0 )
+ if (value == 0)
return;
- if(parent!=null)
+ if (parent != null)
parent.decreaseUsage(value);
int percentUsage;
- synchronized(usageMutex) {
- usage-=value;
- percentUsage = caclPercentUsage();
+ synchronized (usageMutex) {
+ usage -= value;
+ percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
}
-
+
public boolean isFull() {
- if(parent!=null && parent.isFull())
+ if (parent != null && parent.isFull())
return true;
synchronized (usageMutex) {
return percentUsage >= 100;
}
}
-
+
public void addUsageListener(UsageListener listener) {
listeners.add(listener);
}
+
public void removeUsageListener(UsageListener listener) {
listeners.remove(listener);
}
-
+
public long getLimit() {
synchronized (usageMutex) {
return limit;
@@ -194,64 +198,67 @@
}
/**
- * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since
- * the UsageManager is not going to be portion based off the parent.
+ * Sets the memory limit in bytes. Setting the limit in bytes will set the
+ * usagePortion to 0 since the UsageManager is not going to be portion based
+ * off the parent.
*
- * When set using XBean, you can use values such as: "20 mb", "1024 kb", or "1 gb"
+ * When set using XBean, you can use values such as: "20 mb", "1024 kb", or
+ * "1 gb"
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setLimit(long limit) {
- if(percentUsageMinDelta < 0 ) {
+ if (percentUsageMinDelta < 0) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
}
- synchronized(usageMutex){
- this.limit=limit;
- this.usagePortion=0;
+ synchronized (usageMutex) {
+ this.limit = limit;
+ this.usagePortion = 0;
}
onLimitChange();
}
-
- private void onLimitChange() {
-
- // We may need to calculate the limit
- if( usagePortion > 0 && parent!=null ) {
- synchronized(usageMutex){
- limit = (long)(parent.getLimit()*usagePortion);
- }
- }
-
- // Reset the percent currently being used.
+
+ private void onLimitChange() {
+
+ // We may need to calculate the limit
+ if (usagePortion > 0 && parent != null) {
+ synchronized (usageMutex) {
+ limit = (long)(parent.getLimit() * usagePortion);
+ }
+ }
+
+ // Reset the percent currently being used.
int percentUsage;
- synchronized(usageMutex){
- percentUsage=caclPercentUsage();
+ synchronized (usageMutex) {
+ percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
-
- // Let the children know that the limit has changed. They may need to set
+
+ // Let the children know that the limit has changed. They may need to
+ // set
// their limits based on ours.
- for (UsageManager child:children) {
+ for (UsageManager child : children) {
child.onLimitChange();
}
- }
+ }
- public float getUsagePortion() {
- synchronized(usageMutex){
- return usagePortion;
+ public float getUsagePortion() {
+ synchronized (usageMutex) {
+ return usagePortion;
}
- }
+ }
- public void setUsagePortion(float usagePortion) {
- synchronized(usageMutex){
- this.usagePortion = usagePortion;
+ public void setUsagePortion(float usagePortion) {
+ synchronized (usageMutex) {
+ this.usagePortion = usagePortion;
}
onLimitChange();
- }
+ }
/*
- * Sets the minimum number of percentage points the usage has to change before a UsageListener
- * event is fired by the manager.
- */
+ * Sets the minimum number of percentage points the usage has to change
+ * before a UsageListener event is fired by the manager.
+ */
public int getPercentUsage() {
synchronized (usageMutex) {
return percentUsage;
@@ -265,20 +272,20 @@
}
/**
- * Sets the minimum number of percentage points the usage has to change before a UsageListener
- * event is fired by the manager.
+ * Sets the minimum number of percentage points the usage has to change
+ * before a UsageListener event is fired by the manager.
*
* @param percentUsageMinDelta
*/
public void setPercentUsageMinDelta(int percentUsageMinDelta) {
- if(percentUsageMinDelta < 1) {
+ if (percentUsageMinDelta < 1) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
}
int percentUsage;
synchronized (usageMutex) {
this.percentUsageMinDelta = percentUsageMinDelta;
- percentUsage = caclPercentUsage();
- }
+ percentUsage = caclPercentUsage();
+ }
setPercentUsage(percentUsage);
}
@@ -287,10 +294,11 @@
return usage;
}
}
-
+
/**
- * Sets whether or not a send() should fail if there is no space free. The default
- * value is false which means to block the send() method until space becomes available
+ * Sets whether or not a send() should fail if there is no space free. The
+ * default value is false which means to block the send() method until space
+ * becomes available
*/
public void setSendFailIfNoSpace(boolean failProducerIfNoSpace) {
sendFailIfNoSpaceExplicitySet = true;
@@ -304,106 +312,106 @@
return parent.isSendFailIfNoSpace();
}
}
-
+
private void setPercentUsage(int value) {
synchronized (usageMutex) {
int oldValue = percentUsage;
percentUsage = value;
- if( oldValue!=value ) {
+ if (oldValue != value) {
fireEvent(oldValue, value);
}
}
}
-
+
private int caclPercentUsage() {
- if( limit==0 ) return 0;
- return (int)((((usage*100)/limit)/percentUsageMinDelta)*percentUsageMinDelta);
+ if (limit == 0)
+ return 0;
+ return (int)((((usage * 100) / limit) / percentUsageMinDelta) * percentUsageMinDelta);
}
-
- private void fireEvent(int oldPercentUsage,int newPercentUsage){
+
+ private void fireEvent(int oldPercentUsage, int newPercentUsage) {
if (debug) {
- log.debug("Memory usage change. from: "+oldPercentUsage+", to: "+newPercentUsage);
+ log.debug("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage);
}
// Switching from being full to not being full..
- if(oldPercentUsage>=100&&newPercentUsage<100){
- synchronized(usageMutex){
+ if (oldPercentUsage >= 100 && newPercentUsage < 100) {
+ synchronized (usageMutex) {
usageMutex.notifyAll();
for (Iterator iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
- Runnable callback = (Runnable) iter.next();
- callback.run();
- }
+ Runnable callback = (Runnable)iter.next();
+ callback.run();
+ }
callbacks.clear();
}
}
// Let the listeners know
- for(Iterator iter=listeners.iterator();iter.hasNext();){
- UsageListener l=(UsageListener)iter.next();
- l.onMemoryUseChanged(this,oldPercentUsage,newPercentUsage);
+ for (Iterator iter = listeners.iterator(); iter.hasNext();) {
+ UsageListener l = (UsageListener)iter.next();
+ l.onMemoryUseChanged(this, oldPercentUsage, newPercentUsage);
}
}
-
+
public String getName() {
return name;
}
- public String toString(){
-
-
- return "UsageManager("+ getName() +") percentUsage="+percentUsage+"%, usage="+usage+" limit="+limit+" percentUsageMinDelta="
- +percentUsageMinDelta+"%";
+ public String toString() {
+
+ return "UsageManager(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + usage
+ + " limit=" + limit + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
}
- public void start(){
- if(parent!=null){
+ public void start() {
+ if (parent != null) {
parent.addChild(this);
}
}
- public void stop(){
- if(parent!=null){
+ public void stop() {
+ if (parent != null) {
parent.removeChild(this);
}
}
- private void addChild(UsageManager child){
+ private void addChild(UsageManager child) {
children.add(child);
}
- private void removeChild(UsageManager child){
+ private void removeChild(UsageManager child) {
children.remove(child);
}
-
+
/**
* @param callback
- * @return true if the UsageManager was full. The callback will only be called if this method returns true.
+ * @return true if the UsageManager was full. The callback will only be
+ * called if this method returns true.
*/
- public boolean notifyCallbackWhenNotFull( final Runnable callback ) {
-
- if(parent!=null) {
- Runnable r = new Runnable(){
- public void run() {
- synchronized (usageMutex) {
- if( percentUsage >= 100 ) {
- callbacks.add(callback);
- } else {
- callback.run();
- }
- }
- }
+ public boolean notifyCallbackWhenNotFull(final Runnable callback) {
+
+ if (parent != null) {
+ Runnable r = new Runnable() {
+ public void run() {
+ synchronized (usageMutex) {
+ if (percentUsage >= 100) {
+ callbacks.add(callback);
+ } else {
+ callback.run();
+ }
+ }
+ }
};
- if( parent.notifyCallbackWhenNotFull(r) ) {
- return true;
- }
- }
- synchronized (usageMutex) {
- if( percentUsage >= 100 ) {
- callbacks.add(callback);
- return true;
+ if (parent.notifyCallbackWhenNotFull(r)) {
+ return true;
+ }
+ }
+ synchronized (usageMutex) {
+ if (percentUsage >= 100) {
+ callbacks.add(callback);
+ return true;
} else {
- return false;
+ return false;
}
}
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java Wed Aug 8 11:56:59 2007
@@ -37,29 +37,28 @@
public Object put(Object key, Object value) {
long usage = getUsageOfAddedObject(value);
Object rc = super.put(key, value);
- if( rc !=null ) {
+ if (rc != null) {
usage -= getUsageOfRemovedObject(rc);
}
totalUsage.addAndGet(usage);
um.increaseUsage(usage);
return rc;
}
-
+
public Object remove(Object key) {
Object rc = super.remove(key);
- if( rc !=null ) {
+ if (rc != null) {
long usage = getUsageOfRemovedObject(rc);
totalUsage.addAndGet(-usage);
um.decreaseUsage(usage);
}
return rc;
}
-
-
+
protected long getUsageOfAddedObject(Object value) {
return 1;
}
-
+
protected long getUsageOfRemovedObject(Object value) {
return 1;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java Wed Aug 8 11:56:59 2007
@@ -30,14 +30,14 @@
/**
* A simple fixed size {@link MessageList} where there is a single, fixed size
- * list that all messages are added to for simplicity. Though this
- * will lead to possibly slow recovery times as many more messages
- * than is necessary will have to be iterated through for each subscription.
+ * list that all messages are added to for simplicity. Though this will lead to
+ * possibly slow recovery times as many more messages than is necessary will
+ * have to be iterated through for each subscription.
*
* @version $Revision: 1.1 $
*/
public class SimpleMessageList implements MessageList {
- static final private Log log=LogFactory.getLog(SimpleMessageList.class);
+ static final private Log log = LogFactory.getLog(SimpleMessageList.class);
private LinkedList list = new LinkedList();
private int maximumSize = 100 * 64 * 1024;
private int size;
@@ -56,7 +56,7 @@
list.add(node);
size += delta;
while (size > maximumSize) {
- MessageReference evicted = (MessageReference) list.removeFirst();
+ MessageReference evicted = (MessageReference)list.removeFirst();
size -= evicted.getMessageHardRef().getSize();
}
}
@@ -65,23 +65,23 @@
public List getMessages(ActiveMQDestination destination) {
return getList();
}
-
+
public Message[] browse(ActiveMQDestination destination) {
List result = new ArrayList();
- DestinationFilter filter=DestinationFilter.parseFilter(destination);
- synchronized(lock){
- for (Iterator i = list.iterator(); i.hasNext();){
+ DestinationFilter filter = DestinationFilter.parseFilter(destination);
+ synchronized (lock) {
+ for (Iterator i = list.iterator(); i.hasNext();) {
MessageReference ref = (MessageReference)i.next();
Message msg;
- try{
- msg=ref.getMessage();
- if (filter.matches(msg.getDestination())){
+ try {
+ msg = ref.getMessage();
+ if (filter.matches(msg.getDestination())) {
result.add(msg);
}
- }catch(IOException e){
- log.error("Failed to get Message from MessageReference: " + ref,e);
+ } catch (IOException e) {
+ log.error("Failed to get Message from MessageReference: " + ref, e);
}
-
+
}
}
return (Message[])result.toArray(new Message[result.size()]);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java Wed Aug 8 11:56:59 2007
@@ -37,26 +37,27 @@
*/
public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport {
- protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
+ protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
protected Object brokerInfoMutex = new Object();
- public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker, Transport remoteBroker) {
- super(configuration,localBroker, remoteBroker);
+ public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
+ Transport remoteBroker) {
+ super(configuration, localBroker, remoteBroker);
remoteBrokerName = remoteBroker.toString();
- remoteBrokerNameKnownLatch.countDown();
+ remoteBrokerNameKnownLatch.countDown();
}
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
synchronized (brokerInfoMutex) {
- BrokerInfo remoteBrokerInfo = (BrokerInfo) command;
+ BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
BrokerId remoteBrokerId = remoteBrokerInfo.getBrokerId();
-
- // lets associate the incoming endpoint with a broker ID so we can refer to it later
+
+ // lets associate the incoming endpoint with a broker ID so we can
+ // refer to it later
Endpoint from = command.getFrom();
if (from == null) {
log.warn("Incoming command does not have a from endpoint: " + command);
- }
- else {
+ } else {
from.setBrokerInfo(remoteBrokerInfo);
}
if (localBrokerId != null) {
@@ -84,15 +85,14 @@
Endpoint from = command.getFrom();
if (from == null) {
log.warn("Incoming command does not have a from endpoint: " + command);
- }
- else {
+ } else {
answer = from.getBrokerId();
}
- if (answer != null) {
+ if (answer != null) {
return answer;
- }
- else {
- throw new IOException("No broker ID is available for endpoint: " + from + " from command: " + command);
+ } else {
+ throw new IOException("No broker ID is available for endpoint: " + from + " from command: "
+ + command);
}
}
@@ -103,8 +103,8 @@
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL());
}
-
- protected BrokerId[] getRemoteBrokerPath(){
+
+ protected BrokerId[] getRemoteBrokerPath() {
return remoteBrokerPath;
}