You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC

svn commit: r563982 [6/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java Wed Aug  8 11:56:59 2007
@@ -32,125 +32,124 @@
  */
 public final class DataManagerFacade implements org.apache.activemq.kaha.impl.DataManager {
 
-	private static class StoreLocationFacade implements StoreLocation {
-		private final Location location;
+    private static class StoreLocationFacade implements StoreLocation {
+        private final Location location;
 
-		public StoreLocationFacade(Location location) {
-			this.location = location;
-		}
-
-		public int getFile() {
-			return location.getDataFileId();
-		}
-
-		public long getOffset() {
-			return location.getOffset();
-		}
-
-		public int getSize() {
-			return location.getSize();
-		}
-
-		public Location getLocation() {
-			return location;
-		}
-	}
-
-	static private StoreLocation convertToStoreLocation(Location location) {
-		if(location==null)
-			return null;
-		return new StoreLocationFacade(location);
-	}
-	
-	static private Location convertFromStoreLocation(StoreLocation location) {
-		
-		if(location==null)
-			return null;
-		
-		if( location.getClass()== StoreLocationFacade.class )
-			return ((StoreLocationFacade)location).getLocation();
-		
-		Location l = new Location();
-		l.setOffset((int) location.getOffset());
-		l.setSize(location.getSize());
-		l.setDataFileId(location.getFile());
-		return l;
-	}
-
-	static final private ByteSequence FORCE_COMMAND = new ByteSequence(new byte[]{'F', 'O', 'R', 'C', 'E'});
-	
-	AsyncDataManager dataManager;
-	private final String name;
-	private Marshaller redoMarshaller;
-	
-	
-	public DataManagerFacade(AsyncDataManager dataManager, String name) {
-		this.dataManager=dataManager;
-		this.name = name;
-	}
-	
-	public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
-		ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
-		DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
+        public StoreLocationFacade(Location location) {
+            this.location = location;
+        }
+
+        public int getFile() {
+            return location.getDataFileId();
+        }
+
+        public long getOffset() {
+            return location.getOffset();
+        }
+
+        public int getSize() {
+            return location.getSize();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+    }
+
+    static private StoreLocation convertToStoreLocation(Location location) {
+        if (location == null)
+            return null;
+        return new StoreLocationFacade(location);
+    }
+
+    static private Location convertFromStoreLocation(StoreLocation location) {
+
+        if (location == null)
+            return null;
+
+        if (location.getClass() == StoreLocationFacade.class)
+            return ((StoreLocationFacade)location).getLocation();
+
+        Location l = new Location();
+        l.setOffset((int)location.getOffset());
+        l.setSize(location.getSize());
+        l.setDataFileId(location.getFile());
+        return l;
+    }
+
+    static final private ByteSequence FORCE_COMMAND = new ByteSequence(new byte[] {'F', 'O', 'R', 'C', 'E'});
+
+    AsyncDataManager dataManager;
+    private final String name;
+    private Marshaller redoMarshaller;
+
+    public DataManagerFacade(AsyncDataManager dataManager, String name) {
+        this.dataManager = dataManager;
+        this.name = name;
+    }
+
+    public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
+        ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
+        DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
         return marshaller.readPayload(dataIn);
-	}
+    }
 
+    public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
+        final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+        marshaller.writePayload(payload, buffer);
+        ByteSequence data = buffer.toByteSequence();
+        return convertToStoreLocation(dataManager.write(data, (byte)1, false));
+    }
+
+    public void force() throws IOException {
+        dataManager.write(FORCE_COMMAND, (byte)2, true);
+    }
+
+    public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
+        final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+        marshaller.writePayload(payload, buffer);
+        ByteSequence data = buffer.toByteSequence();
+        dataManager.update(convertFromStoreLocation(location), data, false);
+    }
+
+    public void close() throws IOException {
+        dataManager.close();
+    }
+
+    public void consolidateDataFiles() throws IOException {
+        dataManager.consolidateDataFiles();
+    }
+
+    public boolean delete() throws IOException {
+        return dataManager.delete();
+    }
+
+    public void addInterestInFile(int file) throws IOException {
+        dataManager.addInterestInFile(file);
+    }
+
+    public void removeInterestInFile(int file) throws IOException {
+        dataManager.removeInterestInFile(file);
+    }
+
+    public void recoverRedoItems(RedoListener listener) throws IOException {
+        throw new RuntimeException("Not Implemented..");
+    }
+
+    public StoreLocation storeRedoItem(Object payload) throws IOException {
+        throw new RuntimeException("Not Implemented..");
+    }
+
+    public Marshaller getRedoMarshaller() {
+        return redoMarshaller;
+    }
+
+    public void setRedoMarshaller(Marshaller redoMarshaller) {
+        this.redoMarshaller = redoMarshaller;
+    }
+
+    public String getName() {
+        return name;
+    }
 
-	public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
-    	final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
-        marshaller.writePayload(payload,buffer);	
-		ByteSequence data = buffer.toByteSequence();		
-		return convertToStoreLocation(dataManager.write(data, (byte)1, false));
-	}
-
-
-	public void force() throws IOException {
-		dataManager.write(FORCE_COMMAND, (byte)2, true);
-	}
-
-	public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
-    	final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
-        marshaller.writePayload(payload,buffer);	
-		ByteSequence data = buffer.toByteSequence();		
-		dataManager.update(convertFromStoreLocation(location), data, false);
-	}
-	
-	public void close() throws IOException {
-		dataManager.close();
-	}
-
-	public void consolidateDataFiles() throws IOException {
-		dataManager.consolidateDataFiles();
-	}
-
-	public boolean delete() throws IOException {
-		return dataManager.delete();
-	}
- 	
-	public void addInterestInFile(int file) throws IOException {
-		dataManager.addInterestInFile(file);
-	}
-	public void removeInterestInFile(int file) throws IOException {
-		dataManager.removeInterestInFile(file);
-	}
-
-	public void recoverRedoItems(RedoListener listener) throws IOException {
-		throw new RuntimeException("Not Implemented..");
-	}
-	public StoreLocation storeRedoItem(Object payload) throws IOException {
-		throw new RuntimeException("Not Implemented..");
-	}
-
-	public Marshaller getRedoMarshaller() {
-		return redoMarshaller;
-	}	
-	public void setRedoMarshaller(Marshaller redoMarshaller) {
-		this.redoMarshaller = redoMarshaller;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java Wed Aug  8 11:56:59 2007
@@ -33,75 +33,77 @@
  */
 public final class JournalFacade implements Journal {
 
-	
-	public static class RecordLocationFacade implements RecordLocation {
-		private final Location location;
-
-		public RecordLocationFacade(Location location) {
-			this.location = location;
-		}
-
-		public Location getLocation() {
-			return location;
-		}
-
-		public int compareTo(Object o) {
-			RecordLocationFacade rlf = (RecordLocationFacade)o;
-			int rc = location.compareTo(rlf.location);
-			return rc;
-		}
-	}
-
-	static private RecordLocation convertToRecordLocation(Location location) {
-		if(location==null)
-			return null;
-		return new RecordLocationFacade(location);
-	}
-	
-	static private Location convertFromRecordLocation(RecordLocation location) {
-		
-		if(location==null)
-			return null;
-		
-		return ((RecordLocationFacade)location).getLocation();
-	}
-
-	AsyncDataManager dataManager;
-	
-	public JournalFacade(AsyncDataManager dataManager) {
-		this.dataManager = dataManager;
-	}
-
-	public void close() throws IOException {
-		dataManager.close();
-	}
-
-	public RecordLocation getMark() throws IllegalStateException {
-		return convertToRecordLocation(dataManager.getMark());
-	}
-
-	public RecordLocation getNextRecordLocation(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
-		return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location)));
-	}
-
-	public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
-		ByteSequence rc = dataManager.read(convertFromRecordLocation(location));
-		if( rc == null )
-			return null;
-		return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength());
-	}
-
-	public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException {
-	}
-
-	public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException, IOException, IllegalStateException {
-		dataManager.setMark(convertFromRecordLocation(location), sync);
-	}
-
-	public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException {
-		org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
-		ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
-		return convertToRecordLocation(dataManager.write(sequence, sync));
-	}
-	
+    public static class RecordLocationFacade implements RecordLocation {
+        private final Location location;
+
+        public RecordLocationFacade(Location location) {
+            this.location = location;
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int compareTo(Object o) {
+            RecordLocationFacade rlf = (RecordLocationFacade)o;
+            int rc = location.compareTo(rlf.location);
+            return rc;
+        }
+    }
+
+    static private RecordLocation convertToRecordLocation(Location location) {
+        if (location == null)
+            return null;
+        return new RecordLocationFacade(location);
+    }
+
+    static private Location convertFromRecordLocation(RecordLocation location) {
+
+        if (location == null)
+            return null;
+
+        return ((RecordLocationFacade)location).getLocation();
+    }
+
+    AsyncDataManager dataManager;
+
+    public JournalFacade(AsyncDataManager dataManager) {
+        this.dataManager = dataManager;
+    }
+
+    public void close() throws IOException {
+        dataManager.close();
+    }
+
+    public RecordLocation getMark() throws IllegalStateException {
+        return convertToRecordLocation(dataManager.getMark());
+    }
+
+    public RecordLocation getNextRecordLocation(RecordLocation location)
+        throws InvalidRecordLocationException, IOException, IllegalStateException {
+        return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location)));
+    }
+
+    public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException,
+        IllegalStateException {
+        ByteSequence rc = dataManager.read(convertFromRecordLocation(location));
+        if (rc == null)
+            return null;
+        return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength());
+    }
+
+    public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException {
+    }
+
+    public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException,
+        IOException, IllegalStateException {
+        dataManager.setMark(convertFromRecordLocation(location), sync);
+    }
+
+    public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException {
+        org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
+        ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
+        return convertToRecordLocation(dataManager.write(sequence, sync));
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java Wed Aug  8 11:56:59 2007
@@ -27,121 +27,124 @@
  * @version $Revision: 1.2 $
  */
 public final class Location implements Comparable<Location> {
-    
-    public static final byte MARK_TYPE=-1;
-    public static final byte USER_TYPE=1;    
-    public static final byte NOT_SET_TYPE=0;    
-    public static final int NOT_SET=-1;
-
-    private int dataFileId=NOT_SET;
-    private int offset=NOT_SET;
-    private int size=NOT_SET;
-    private byte type=NOT_SET_TYPE;
+
+    public static final byte MARK_TYPE = -1;
+    public static final byte USER_TYPE = 1;
+    public static final byte NOT_SET_TYPE = 0;
+    public static final int NOT_SET = -1;
+
+    private int dataFileId = NOT_SET;
+    private int offset = NOT_SET;
+    private int size = NOT_SET;
+    private byte type = NOT_SET_TYPE;
     private CountDownLatch latch;
 
-    public Location(){}
-    
+    public Location() {
+    }
+
     Location(Location item) {
         this.dataFileId = item.dataFileId;
         this.offset = item.offset;
         this.size = item.size;
         this.type = item.type;
     }
-    
-    boolean isValid(){
+
+    boolean isValid() {
         return dataFileId != NOT_SET;
     }
 
     /**
      * @return the size of the data record including the header.
      */
-    public int getSize(){
+    public int getSize() {
         return size;
     }
 
     /**
      * @param size the size of the data record including the header.
      */
-    public void setSize(int size){
-        this.size=size;
+    public void setSize(int size) {
+        this.size = size;
     }
 
     /**
      * @return the size of the payload of the record.
      */
     public int getPaylodSize() {
-        return size-AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
-    }  
-    
-    public int getOffset(){
+        return size - AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+    }
+
+    public int getOffset() {
         return offset;
     }
-    public void setOffset(int offset){
-        this.offset=offset;
+
+    public void setOffset(int offset) {
+        this.offset = offset;
     }
 
-    public int getDataFileId(){
+    public int getDataFileId() {
         return dataFileId;
     }
 
-    public void setDataFileId(int file){
-        this.dataFileId=file;
+    public void setDataFileId(int file) {
+        this.dataFileId = file;
     }
 
-	public byte getType() {
-		return type;
-	}
+    public byte getType() {
+        return type;
+    }
 
-	public void setType(byte type) {
-		this.type = type;
-	}
+    public void setType(byte type) {
+        this.type = type;
+    }
 
-	public String toString(){
-        String result="offset = "+offset+", file = " + dataFileId + ", size = "+size + ", type = "+type;
+    public String toString() {
+        String result = "offset = " + offset + ", file = " + dataFileId + ", size = " + size + ", type = "
+                        + type;
         return result;
     }
 
-	public void writeExternal(DataOutput dos) throws IOException {
-		dos.writeInt(dataFileId);
-		dos.writeInt(offset);
-		dos.writeInt(size);
-		dos.writeByte(type);
-	}
-
-	public void readExternal(DataInput dis) throws IOException {
-		dataFileId = dis.readInt();
-		offset = dis.readInt();
-		size = dis.readInt();
-		type = dis.readByte();
-	}
-
-	public CountDownLatch getLatch() {
-		return latch;
-	}
-	public void setLatch(CountDownLatch latch) {
-		this.latch = latch;
-	}
-
-	public int compareTo(Location o) {
-		Location l = (Location)o;
-		if( dataFileId == l.dataFileId ) {
-			int rc = offset-l.offset;
-			return rc;
-		}
-		return dataFileId - l.dataFileId;
-	}
-    
+    public void writeExternal(DataOutput dos) throws IOException {
+        dos.writeInt(dataFileId);
+        dos.writeInt(offset);
+        dos.writeInt(size);
+        dos.writeByte(type);
+    }
+
+    public void readExternal(DataInput dis) throws IOException {
+        dataFileId = dis.readInt();
+        offset = dis.readInt();
+        size = dis.readInt();
+        type = dis.readByte();
+    }
+
+    public CountDownLatch getLatch() {
+        return latch;
+    }
+
+    public void setLatch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+
+    public int compareTo(Location o) {
+        Location l = (Location)o;
+        if (dataFileId == l.dataFileId) {
+            int rc = offset - l.offset;
+            return rc;
+        }
+        return dataFileId - l.dataFileId;
+    }
+
     public boolean equals(Object o) {
         boolean result = false;
         if (o instanceof Location) {
-            result = compareTo((Location)o)==0;
+            result = compareTo((Location)o) == 0;
         }
         return result;
     }
-    
+
     public int hashCode() {
         return dataFileId ^ offset;
     }
-
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Wed Aug  8 11:56:59 2007
@@ -22,191 +22,194 @@
 import java.nio.channels.FileChannel;
 
 /**
- * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more efficently
- * copy data to files.
+ * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more
+ * efficently copy data to files.
  * 
  * @version $Revision: 1.1.1.1 $
  */
 class NIODataFileAppender extends DataFileAppender {
-    
+
     public NIODataFileAppender(AsyncDataManager fileManager) {
-		super(fileManager);
-	}
+        super(fileManager);
+    }
 
-	/**
-     * The async processing loop that writes to the data files and
-     * does the force calls.  
+    /**
+     * The async processing loop that writes to the data files and does the
+     * force calls.
      * 
-     * Since the file sync() call is the slowest of all the operations, 
-     * this algorithm tries to 'batch' or group together several file sync() requests 
-     * into a single file sync() call. The batching is accomplished attaching the 
-     * same CountDownLatch instance to every force request in a group.
+     * Since the file sync() call is the slowest of all the operations, this
+     * algorithm tries to 'batch' or group together several file sync() requests
+     * into a single file sync() call. The batching is accomplished attaching
+     * the same CountDownLatch instance to every force request in a group.
      * 
      */
     protected void processQueue() {
-		DataFile dataFile=null;
-		RandomAccessFile file=null;
-    	FileChannel channel=null;
-
-    	try {
-    		
-        	ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
-        	ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
-    		ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE);
-    		
-    		// Populate the static parts of the headers and footers..
-    		header.putInt(0); // size
-    		header.put((byte) 0); // type
-    		header.put(RESERVED_SPACE); // reserved
-    		header.put(AsyncDataManager.ITEM_HEAD_SOR);    		
-    		footer.put(AsyncDataManager.ITEM_HEAD_EOR);
-    		
-	    	while( true ) {
-	    		
-	    		Object o = null;
-
-	    		// Block till we get a command.
-	    		synchronized(enqueueMutex) {
-	    			while( true ) {
-	    				if( shutdown ) {
-	    					o = SHUTDOWN_COMMAND;
-	    					break;
-	    				}
-	    				if( nextWriteBatch!=null ) {
-	    					o = nextWriteBatch;
-	    					nextWriteBatch=null;
-	    					break;
-	    				}
-	    				enqueueMutex.wait();
-	    			}
-	    			enqueueMutex.notify();
-	            }        
-	    		
-	    		
-	        	if( o == SHUTDOWN_COMMAND ) {
-	        		break;
-	        	} 
-	        	
-	        	WriteBatch wb = (WriteBatch) o;
-				if( dataFile != wb.dataFile ) {
-	        		if( file!=null ) {
-	        			dataFile.closeRandomAccessFile(file);
-	        		}
-	        		dataFile = wb.dataFile;
-	        		file = dataFile.openRandomAccessFile(true);
-	        		channel = file.getChannel();
-	        	}
-	        	
-	        	WriteCommand write = wb.first;
-	        	
-	        	// Write all the data.
-				// Only need to seek to first location.. all others 
-				// are in sequence.
-	        	file.seek(write.location.getOffset());
-	 
-	        	// 
-        		// is it just 1 big write?
-	        	if( wb.size == write.location.getSize() ) {
-	        		
-	        		header.clear();
-	        		header.putInt(write.location.getSize());
-	        		header.put(write.location.getType());
-	        		header.clear();
-	                transfer(header, channel);
-	                ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
-	                transfer(source, channel);
-		        	footer.clear();
-	                transfer(footer, channel);
-		        	
-	        	} else {
-	        		
-	        		// Combine the smaller writes into 1 big buffer
-		        	while( write!=null ) {
-	        		
-		        		header.clear();
-		        		header.putInt(write.location.getSize());
-		        		header.put(write.location.getType());
-		        		header.clear();
-		        		copy(header, buffer);
-		        		assert !header.hasRemaining();
-		        		
-		                ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
-		                copy(source, buffer);
-		        		assert !source.hasRemaining();
-
-		        		footer.clear();
-		        		copy(footer, buffer);
-		        		assert !footer.hasRemaining();
-	
-		        		write = (WriteCommand) write.getNext();
-		        	}
-		        	
-	    			// Fully write out the buffer..
-	    			buffer.flip();
-	                transfer(buffer, channel);
-	                buffer.clear();
-		        }
-		        	
-    			file.getChannel().force(false);
-
-    			WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
-    			dataManager.setLastAppendLocation( lastWrite.location );
-
-    			// Signal any waiting threads that the write is on disk.
-    			if( wb.latch!=null ) {
-    				wb.latch.countDown();
-    			}
-    			
-    			// Now that the data is on disk, remove the writes from the in flight
-    			// cache.
-	        	write = wb.first;
-	        	while( write!=null ) {
-	        		if( !write.sync ) {
-	        			inflightWrites.remove(new WriteKey(write.location));
-	        		}
-	        		write = (WriteCommand) write.getNext();
-	        	}
-	    	}
-	    	
-		} catch (IOException e) {
-	    	synchronized( enqueueMutex ) {
-	    		firstAsyncException = e;
-	    	}
-		} catch (InterruptedException e) {
-		} finally {
-    		try {
-				if( file!=null ) {
-					dataFile.closeRandomAccessFile(file);
-				}
-			} catch (IOException e) {
-			}
-    		shutdownDone.countDown();
-    	}
+        DataFile dataFile = null;
+        RandomAccessFile file = null;
+        FileChannel channel = null;
+
+        try {
+
+            ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
+            ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
+            ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE);
+
+            // Populate the static parts of the headers and footers..
+            header.putInt(0); // size
+            header.put((byte)0); // type
+            header.put(RESERVED_SPACE); // reserved
+            header.put(AsyncDataManager.ITEM_HEAD_SOR);
+            footer.put(AsyncDataManager.ITEM_HEAD_EOR);
+
+            while (true) {
+
+                Object o = null;
+
+                // Block till we get a command.
+                synchronized (enqueueMutex) {
+                    while (true) {
+                        if (shutdown) {
+                            o = SHUTDOWN_COMMAND;
+                            break;
+                        }
+                        if (nextWriteBatch != null) {
+                            o = nextWriteBatch;
+                            nextWriteBatch = null;
+                            break;
+                        }
+                        enqueueMutex.wait();
+                    }
+                    enqueueMutex.notify();
+                }
+
+                if (o == SHUTDOWN_COMMAND) {
+                    break;
+                }
+
+                WriteBatch wb = (WriteBatch)o;
+                if (dataFile != wb.dataFile) {
+                    if (file != null) {
+                        dataFile.closeRandomAccessFile(file);
+                    }
+                    dataFile = wb.dataFile;
+                    file = dataFile.openRandomAccessFile(true);
+                    channel = file.getChannel();
+                }
+
+                WriteCommand write = wb.first;
+
+                // Write all the data.
+                // Only need to seek to first location.. all others
+                // are in sequence.
+                file.seek(write.location.getOffset());
+
+                // 
+                // is it just 1 big write?
+                if (wb.size == write.location.getSize()) {
+
+                    header.clear();
+                    header.putInt(write.location.getSize());
+                    header.put(write.location.getType());
+                    header.clear();
+                    transfer(header, channel);
+                    ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+                                                        write.data.getLength());
+                    transfer(source, channel);
+                    footer.clear();
+                    transfer(footer, channel);
+
+                } else {
+
+                    // Combine the smaller writes into 1 big buffer
+                    while (write != null) {
+
+                        header.clear();
+                        header.putInt(write.location.getSize());
+                        header.put(write.location.getType());
+                        header.clear();
+                        copy(header, buffer);
+                        assert !header.hasRemaining();
+
+                        ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+                                                            write.data.getLength());
+                        copy(source, buffer);
+                        assert !source.hasRemaining();
+
+                        footer.clear();
+                        copy(footer, buffer);
+                        assert !footer.hasRemaining();
+
+                        write = (WriteCommand)write.getNext();
+                    }
+
+                    // Fully write out the buffer..
+                    buffer.flip();
+                    transfer(buffer, channel);
+                    buffer.clear();
+                }
+
+                file.getChannel().force(false);
+
+                WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+                dataManager.setLastAppendLocation(lastWrite.location);
+
+                // Signal any waiting threads that the write is on disk.
+                if (wb.latch != null) {
+                    wb.latch.countDown();
+                }
+
+                // Now that the data is on disk, remove the writes from the in
+                // flight
+                // cache.
+                write = wb.first;
+                while (write != null) {
+                    if (!write.sync) {
+                        inflightWrites.remove(new WriteKey(write.location));
+                    }
+                    write = (WriteCommand)write.getNext();
+                }
+            }
+
+        } catch (IOException e) {
+            synchronized (enqueueMutex) {
+                firstAsyncException = e;
+            }
+        } catch (InterruptedException e) {
+        } finally {
+            try {
+                if (file != null) {
+                    dataFile.closeRandomAccessFile(file);
+                }
+            } catch (IOException e) {
+            }
+            shutdownDone.countDown();
+        }
     }
 
     /**
      * Copy the bytes in header to the channel.
+     * 
      * @param header - source of data
      * @param channel - destination where the data will be written.
      * @throws IOException
      */
-	private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
-		while (header.hasRemaining()) {
-		    channel.write(header);                
-		}
-	}
-
-	private int copy(ByteBuffer src, ByteBuffer dest) {
-	    int rc = Math.min(dest.remaining(), src.remaining()); 
-		if( rc > 0 ) {
-		    // Adjust our limit so that we don't overflow the dest buffer. 
-			int limit = src.limit();
-			src.limit(src.position()+rc);
+    private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
+        while (header.hasRemaining()) {
+            channel.write(header);
+        }
+    }
+
+    private int copy(ByteBuffer src, ByteBuffer dest) {
+        int rc = Math.min(dest.remaining(), src.remaining());
+        if (rc > 0) {
+            // Adjust our limit so that we don't overflow the dest buffer.
+            int limit = src.limit();
+            src.limit(src.position() + rc);
             dest.put(src);
             // restore the limit.
-			src.limit(limit);
-		}
-		return rc;
-	}
-        
+            src.limit(limit);
+        }
+        return rc;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySet.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySet.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySet.java Wed Aug  8 11:56:59 2007
@@ -24,85 +24,85 @@
 import java.util.Set;
 
 /**
-* Set of Map.Entry objects for a container
-* 
-* @version $Revision: 1.2 $
-*/
-public class ContainerEntrySet extends ContainerCollectionSupport implements Set{
-    ContainerEntrySet(MapContainerImpl container){
+ * Set of Map.Entry objects for a container
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerEntrySet extends ContainerCollectionSupport implements Set {
+    ContainerEntrySet(MapContainerImpl container) {
         super(container);
     }
 
-    public boolean contains(Object o){
+    public boolean contains(Object o) {
         return container.entrySet().contains(o);
     }
 
-    public Iterator iterator(){
-        return new ContainerEntrySetIterator(container,buildEntrySet().iterator());
+    public Iterator iterator() {
+        return new ContainerEntrySetIterator(container, buildEntrySet().iterator());
     }
 
-    public Object[] toArray(){
+    public Object[] toArray() {
         return buildEntrySet().toArray();
     }
 
-    public Object[] toArray(Object[] a){
+    public Object[] toArray(Object[] a) {
         return buildEntrySet().toArray(a);
     }
 
-    public boolean add(Object o){
+    public boolean add(Object o) {
         throw new UnsupportedOperationException("Cannot add here");
     }
 
-    public boolean remove(Object o){
-        boolean result=false;
-        if(buildEntrySet().remove(o)){
-            ContainerMapEntry entry=(ContainerMapEntry) o;
+    public boolean remove(Object o) {
+        boolean result = false;
+        if (buildEntrySet().remove(o)) {
+            ContainerMapEntry entry = (ContainerMapEntry)o;
             container.remove(entry.getKey());
         }
         return result;
     }
 
-    public boolean containsAll(Collection c){
+    public boolean containsAll(Collection c) {
         return buildEntrySet().containsAll(c);
     }
 
-    public boolean addAll(Collection c){
+    public boolean addAll(Collection c) {
         throw new UnsupportedOperationException("Cannot add here");
     }
 
-    public boolean retainAll(Collection c){
-        List tmpList=new ArrayList();
-        for(Iterator i=c.iterator();i.hasNext();){
-            Object o=i.next();
-            if(!contains(o)){
+    public boolean retainAll(Collection c) {
+        List tmpList = new ArrayList();
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            Object o = i.next();
+            if (!contains(o)) {
                 tmpList.add(o);
             }
         }
-        boolean result=false;
-        for(Iterator i=tmpList.iterator();i.hasNext();){
-            result|=remove(i.next());
+        boolean result = false;
+        for (Iterator i = tmpList.iterator(); i.hasNext();) {
+            result |= remove(i.next());
         }
         return result;
     }
 
-    public boolean removeAll(Collection c){
-        boolean result=true;
-        for(Iterator i=c.iterator();i.hasNext();){
-            if(!remove(i.next())){
-                result=false;
+    public boolean removeAll(Collection c) {
+        boolean result = true;
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            if (!remove(i.next())) {
+                result = false;
             }
         }
         return result;
     }
 
-    public void clear(){
+    public void clear() {
         container.clear();
     }
 
-    protected Set buildEntrySet(){
-        Set set=new HashSet();
-        for(Iterator i=container.keySet().iterator();i.hasNext();){
-            ContainerMapEntry entry=new ContainerMapEntry(container,i.next());
+    protected Set buildEntrySet() {
+        Set set = new HashSet();
+        for (Iterator i = container.keySet().iterator(); i.hasNext();) {
+            ContainerMapEntry entry = new ContainerMapEntry(container, i.next());
             set.add(entry);
         }
         return set;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySetIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySetIterator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySetIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySetIterator.java Wed Aug  8 11:56:59 2007
@@ -18,33 +18,33 @@
 
 import java.util.Iterator;
 
-
 /**
-* An Iterator for a container entry Set
-* 
-* @version $Revision: 1.2 $
-*/
-public class ContainerEntrySetIterator implements Iterator{
+ * An Iterator for a container entry Set
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerEntrySetIterator implements Iterator {
     private MapContainerImpl container;
-    private Iterator  iter;
+    private Iterator iter;
     private ContainerMapEntry currentEntry;
-    ContainerEntrySetIterator(MapContainerImpl container,Iterator iter){
+
+    ContainerEntrySetIterator(MapContainerImpl container, Iterator iter) {
         this.container = container;
         this.iter = iter;
     }
-    
-    public boolean hasNext(){
+
+    public boolean hasNext() {
         return iter.hasNext();
     }
 
-    public Object next(){
-        currentEntry =  (ContainerMapEntry) iter.next();
+    public Object next() {
+        currentEntry = (ContainerMapEntry)iter.next();
         return currentEntry;
     }
 
-    public void remove(){
-       if (currentEntry != null){
-           container.remove(currentEntry.getKey());
-       }
+    public void remove() {
+        if (currentEntry != null) {
+            container.remove(currentEntry.getKey());
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java Wed Aug  8 11:56:59 2007
@@ -25,27 +25,25 @@
 import org.apache.activemq.kaha.impl.index.IndexLinkedList;
 
 /**
-* A Set of keys for the container
-* 
-* @version $Revision: 1.2 $
-*/
-public class ContainerKeySet extends ContainerCollectionSupport implements Set{
-  
-    
-    ContainerKeySet(MapContainerImpl container){
+ * A Set of keys for the container
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerKeySet extends ContainerCollectionSupport implements Set {
+
+    ContainerKeySet(MapContainerImpl container) {
         super(container);
     }
-    
-    
-    public boolean contains(Object o){
+
+    public boolean contains(Object o) {
         return container.containsKey(o);
     }
 
-    public Iterator iterator(){
+    public Iterator iterator() {
         return new ContainerKeySetIterator(container);
     }
 
-    public Object[] toArray(){
+    public Object[] toArray() {
         List list = new ArrayList();
         IndexItem item = container.getInternalList().getRoot();
         while ((item = container.getInternalList().getNextEntry(item)) != null) {
@@ -54,7 +52,7 @@
         return list.toArray();
     }
 
-    public Object[] toArray(Object[] a){
+    public Object[] toArray(Object[] a) {
         List list = new ArrayList();
         IndexItem item = container.getInternalList().getRoot();
         while ((item = container.getInternalList().getNextEntry(item)) != null) {
@@ -63,58 +61,58 @@
         return list.toArray(a);
     }
 
-    public boolean add(Object o){
+    public boolean add(Object o) {
         throw new UnsupportedOperationException("Cannot add here");
     }
 
-    public boolean remove(Object o){
-       return container.remove(o) != null;
+    public boolean remove(Object o) {
+        return container.remove(o) != null;
     }
 
-    public boolean containsAll(Collection c){
+    public boolean containsAll(Collection c) {
         boolean result = true;
-        for (Object key:c) {
-            if (!(result&=container.containsKey(key))) {
+        for (Object key : c) {
+            if (!(result &= container.containsKey(key))) {
                 break;
             }
         }
-       return result;
+        return result;
     }
 
-    public boolean addAll(Collection c){
+    public boolean addAll(Collection c) {
         throw new UnsupportedOperationException("Cannot add here");
     }
 
-    public boolean retainAll(Collection c){
+    public boolean retainAll(Collection c) {
         List tmpList = new ArrayList();
-        for (Iterator i = c.iterator(); i.hasNext(); ){
+        for (Iterator i = c.iterator(); i.hasNext();) {
             Object o = i.next();
-            if (!contains(o)){
+            if (!contains(o)) {
                 tmpList.add(o);
-            }  
+            }
         }
-        for(Iterator i = tmpList.iterator(); i.hasNext();){
+        for (Iterator i = tmpList.iterator(); i.hasNext();) {
             remove(i.next());
         }
         return !tmpList.isEmpty();
     }
 
-    public boolean removeAll(Collection c){
+    public boolean removeAll(Collection c) {
         boolean result = true;
-        for (Iterator i = c.iterator(); i.hasNext(); ){
-            if (!remove(i.next())){
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            if (!remove(i.next())) {
                 result = false;
             }
         }
         return result;
     }
 
-    public void clear(){
-      container.clear();
+    public void clear() {
+        container.clear();
     }
-    
+
     public String toString() {
-        StringBuffer result =new StringBuffer(32);
+        StringBuffer result = new StringBuffer(32);
         result.append("ContainerKeySet[");
         IndexItem item = container.getInternalList().getRoot();
         while ((item = container.getInternalList().getNextEntry(item)) != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java Wed Aug  8 11:56:59 2007
@@ -20,38 +20,37 @@
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexLinkedList;
 
-
 /**
-*Iterator for the set of keys for a container
-* 
-* @version $Revision: 1.2 $
-*/
-public class ContainerKeySetIterator implements Iterator{
+ * Iterator for the set of keys for a container
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerKeySetIterator implements Iterator {
     private MapContainerImpl container;
     private IndexLinkedList list;
     protected IndexItem nextItem;
     protected IndexItem currentItem;
-   
-    ContainerKeySetIterator(MapContainerImpl container){
+
+    ContainerKeySetIterator(MapContainerImpl container) {
         this.container = container;
-        this.list=container.getInternalList();
-        this.currentItem=list.getRoot();
-        this.nextItem=list.getNextEntry(currentItem);
+        this.list = container.getInternalList();
+        this.currentItem = list.getRoot();
+        this.nextItem = list.getNextEntry(currentItem);
     }
-    
-    public boolean hasNext(){
-        return nextItem!=null;
+
+    public boolean hasNext() {
+        return nextItem != null;
     }
 
-    public Object next(){
-        currentItem=nextItem;
-        Object result=container.getKey(nextItem);
-        nextItem=list.getNextEntry(nextItem);
+    public Object next() {
+        currentItem = nextItem;
+        Object result = container.getKey(nextItem);
+        nextItem = list.getNextEntry(nextItem);
         return result;
     }
 
-    public void remove(){
-        if(currentItem!=null){
+    public void remove() {
+        if (currentItem != null) {
             container.remove(currentItem);
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerListIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerListIterator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerListIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerListIterator.java Wed Aug  8 11:56:59 2007
@@ -22,10 +22,10 @@
 /**
  * @version $Revision: 1.2 $
  */
-public class ContainerListIterator extends ContainerValueCollectionIterator implements ListIterator{
+public class ContainerListIterator extends ContainerValueCollectionIterator implements ListIterator {
 
-    protected ContainerListIterator(ListContainerImpl container,IndexLinkedList list,IndexItem start){
-        super(container,list,start);
+    protected ContainerListIterator(ListContainerImpl container, IndexLinkedList list, IndexItem start) {
+        super(container, list, start);
     }
 
     /*
@@ -33,10 +33,10 @@
      * 
      * @see java.util.ListIterator#hasPrevious()
      */
-    public boolean hasPrevious(){
-        synchronized(container){
-            nextItem=(IndexItem)list.refreshEntry(nextItem);
-            return list.getPrevEntry(nextItem)!=null;
+    public boolean hasPrevious() {
+        synchronized (container) {
+            nextItem = (IndexItem)list.refreshEntry(nextItem);
+            return list.getPrevEntry(nextItem) != null;
         }
     }
 
@@ -45,11 +45,11 @@
      * 
      * @see java.util.ListIterator#previous()
      */
-    public Object previous(){
-        synchronized(container){
-            nextItem=(IndexItem)list.refreshEntry(nextItem);
-            nextItem=list.getPrevEntry(nextItem);
-            return nextItem!=null?container.getValue(nextItem):null;
+    public Object previous() {
+        synchronized (container) {
+            nextItem = (IndexItem)list.refreshEntry(nextItem);
+            nextItem = list.getPrevEntry(nextItem);
+            return nextItem != null ? container.getValue(nextItem) : null;
         }
     }
 
@@ -58,14 +58,14 @@
      * 
      * @see java.util.ListIterator#nextIndex()
      */
-    public int nextIndex(){
-        int result=-1;
-        if(nextItem!=null){
-            synchronized(container){
-                nextItem=(IndexItem)list.refreshEntry(nextItem);
-                StoreEntry next=list.getNextEntry(nextItem);
-                if(next!=null){
-                    result=container.getInternalList().indexOf(next);
+    public int nextIndex() {
+        int result = -1;
+        if (nextItem != null) {
+            synchronized (container) {
+                nextItem = (IndexItem)list.refreshEntry(nextItem);
+                StoreEntry next = list.getNextEntry(nextItem);
+                if (next != null) {
+                    result = container.getInternalList().indexOf(next);
                 }
             }
         }
@@ -77,14 +77,14 @@
      * 
      * @see java.util.ListIterator#previousIndex()
      */
-    public int previousIndex(){
-        int result=-1;
-        if(nextItem!=null){
-            synchronized(container){
-                nextItem=(IndexItem)list.refreshEntry(nextItem);
-                StoreEntry prev=list.getPrevEntry(nextItem);
-                if(prev!=null){
-                    result=container.getInternalList().indexOf(prev);
+    public int previousIndex() {
+        int result = -1;
+        if (nextItem != null) {
+            synchronized (container) {
+                nextItem = (IndexItem)list.refreshEntry(nextItem);
+                StoreEntry prev = list.getPrevEntry(nextItem);
+                if (prev != null) {
+                    result = container.getInternalList().indexOf(prev);
                 }
             }
         }
@@ -96,9 +96,9 @@
      * 
      * @see java.util.ListIterator#set(E)
      */
-    public void set(Object o){
-        IndexItem item=((ListContainerImpl)container).internalSet(previousIndex()+1,o);
-        nextItem=item;
+    public void set(Object o) {
+        IndexItem item = ((ListContainerImpl)container).internalSet(previousIndex() + 1, o);
+        nextItem = item;
     }
 
     /*
@@ -106,8 +106,8 @@
      * 
      * @see java.util.ListIterator#add(E)
      */
-    public void add(Object o){
-        IndexItem item=((ListContainerImpl)container).internalAdd(previousIndex()+1,o);
-        nextItem=item;
+    public void add(Object o) {
+        IndexItem item = ((ListContainerImpl)container).internalAdd(previousIndex() + 1, o);
+        nextItem = item;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerMapEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerMapEntry.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerMapEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerMapEntry.java Wed Aug  8 11:56:59 2007
@@ -20,34 +20,30 @@
 import org.apache.activemq.kaha.MapContainer;
 
 /**
-* Map.Entry implementation for a container
-* 
-* @version $Revision: 1.2 $
-*/
+ * Map.Entry implementation for a container
+ * 
+ * @version $Revision: 1.2 $
+ */
 class ContainerMapEntry implements Map.Entry {
 
     private MapContainer container;
     private Object key;
-   
-    ContainerMapEntry(MapContainer container,Object key){
+
+    ContainerMapEntry(MapContainer container, Object key) {
         this.container = container;
         this.key = key;
-      
+
     }
-    
-    
-    public Object getKey(){
+
+    public Object getKey() {
         return key;
     }
 
-    public Object getValue(){
+    public Object getValue() {
         return container.get(key);
     }
 
-    public Object setValue(Object value){
+    public Object setValue(Object value) {
         return container.put(key, value);
     }
 }
-   
-
-    

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollection.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollection.java Wed Aug  8 11:56:59 2007
@@ -26,59 +26,53 @@
 import org.apache.activemq.kaha.impl.index.IndexLinkedList;
 
 /**
-* Values collection for the MapContainer
-* 
-* @version $Revision: 1.2 $
-*/
-class ContainerValueCollection extends ContainerCollectionSupport implements Collection{
-    
-   
-    ContainerValueCollection(MapContainerImpl container){
+ * Values collection for the MapContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+class ContainerValueCollection extends ContainerCollectionSupport implements Collection {
+
+    ContainerValueCollection(MapContainerImpl container) {
         super(container);
     }
-    
-    
-    
-    public boolean contains(Object o){
+
+    public boolean contains(Object o) {
         return container.containsValue(o);
     }
 
-    
-    public Iterator iterator(){
-        IndexLinkedList list=container.getItemList();
-        return new ContainerValueCollectionIterator(container,list,list.getRoot());
+    public Iterator iterator() {
+        IndexLinkedList list = container.getItemList();
+        return new ContainerValueCollectionIterator(container, list, list.getRoot());
     }
 
-   
-    public Object[] toArray(){
+    public Object[] toArray() {
         Object[] result = null;
         IndexLinkedList list = container.getItemList();
-        synchronized(list){
+        synchronized (list) {
             result = new Object[list.size()];
             IndexItem item = list.getFirst();
             int count = 0;
-            while (item != null){
-                Object value=container.getValue(item);  
+            while (item != null) {
+                Object value = container.getValue(item);
                 result[count++] = value;
-                
+
                 item = list.getNextEntry(item);
             }
-           
-            
+
         }
         return result;
     }
 
-    public Object[] toArray(Object[] result){
-        IndexLinkedList list=container.getItemList();
-        synchronized(list){
-            if(result.length<=list.size()){
+    public Object[] toArray(Object[] result) {
+        IndexLinkedList list = container.getItemList();
+        synchronized (list) {
+            if (result.length <= list.size()) {
                 IndexItem item = list.getFirst();
                 int count = 0;
-                while (item != null){
-                    Object value=container.getValue(item);  
+                while (item != null) {
+                    Object value = container.getValue(item);
                     result[count++] = value;
-                    
+
                     item = list.getNextEntry(item);
                 }
             }
@@ -86,21 +80,18 @@
         return result;
     }
 
-    
-    public boolean add(Object o){
-        throw  new UnsupportedOperationException("Can't add an object here");
+    public boolean add(Object o) {
+        throw new UnsupportedOperationException("Can't add an object here");
     }
 
-   
-    public boolean remove(Object o){
+    public boolean remove(Object o) {
         return container.removeValue(o);
     }
 
-    
-    public boolean containsAll(Collection c){
+    public boolean containsAll(Collection c) {
         boolean result = !c.isEmpty();
-        for (Iterator i = c.iterator(); i.hasNext(); ){
-            if (!contains(i.next())){
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            if (!contains(i.next())) {
                 result = false;
                 break;
             }
@@ -108,38 +99,34 @@
         return result;
     }
 
-   
-    public boolean addAll(Collection c){
-        throw  new UnsupportedOperationException("Can't add everything here!");
+    public boolean addAll(Collection c) {
+        throw new UnsupportedOperationException("Can't add everything here!");
     }
 
-    
-    public boolean removeAll(Collection c){
+    public boolean removeAll(Collection c) {
         boolean result = true;
-        for (Iterator i = c.iterator(); i.hasNext(); ){
+        for (Iterator i = c.iterator(); i.hasNext();) {
             Object obj = i.next();
-                result&=remove(obj);
+            result &= remove(obj);
         }
         return result;
     }
 
-    
-    public boolean retainAll(Collection c){
-       List tmpList = new ArrayList();
-       for (Iterator i = c.iterator(); i.hasNext(); ){
-           Object o = i.next();
-           if (!contains(o)){
-               tmpList.add(o);
-           }  
-       }
-       for(Iterator i = tmpList.iterator(); i.hasNext();){
-           remove(i.next());
-       }
-       return !tmpList.isEmpty();
-    }
-
-   
-    public void clear(){
-     container.clear();
+    public boolean retainAll(Collection c) {
+        List tmpList = new ArrayList();
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            Object o = i.next();
+            if (!contains(o)) {
+                tmpList.add(o);
+            }
+        }
+        for (Iterator i = tmpList.iterator(); i.hasNext();) {
+            remove(i.next());
+        }
+        return !tmpList.isEmpty();
+    }
+
+    public void clear() {
+        container.clear();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollectionIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollectionIterator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollectionIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollectionIterator.java Wed Aug  8 11:56:59 2007
@@ -23,38 +23,38 @@
  * 
  * @version $Revision: 1.2 $
  */
-public class ContainerValueCollectionIterator implements Iterator{
+public class ContainerValueCollectionIterator implements Iterator {
 
     protected BaseContainerImpl container;
     protected IndexLinkedList list;
     protected IndexItem nextItem;
     protected IndexItem currentItem;
 
-    ContainerValueCollectionIterator(BaseContainerImpl container,IndexLinkedList list,IndexItem start){
-        this.container=container;
-        this.list=list;
-        this.currentItem=start;
-        this.nextItem=list.getNextEntry((IndexItem)list.refreshEntry(start));
+    ContainerValueCollectionIterator(BaseContainerImpl container, IndexLinkedList list, IndexItem start) {
+        this.container = container;
+        this.list = list;
+        this.currentItem = start;
+        this.nextItem = list.getNextEntry((IndexItem)list.refreshEntry(start));
     }
 
-    public boolean hasNext(){
-        return nextItem!=null;
+    public boolean hasNext() {
+        return nextItem != null;
     }
 
-    public Object next(){
-        synchronized(container){
-            nextItem=(IndexItem)list.refreshEntry(nextItem);
-            currentItem=nextItem;
-            Object result=container.getValue(nextItem);
-            nextItem=list.getNextEntry(nextItem);
+    public Object next() {
+        synchronized (container) {
+            nextItem = (IndexItem)list.refreshEntry(nextItem);
+            currentItem = nextItem;
+            Object result = container.getValue(nextItem);
+            nextItem = list.getNextEntry(nextItem);
             return result;
         }
     }
 
-    public void remove(){
-        synchronized(container){
-            if(currentItem!=null){
-                currentItem=(IndexItem)list.refreshEntry(currentItem);
+    public void remove() {
+        synchronized (container) {
+            if (currentItem != null) {
+                currentItem = (IndexItem)list.refreshEntry(currentItem);
                 container.remove(currentItem);
             }
         }