You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [6/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java Wed Aug 8 11:56:59 2007
@@ -32,125 +32,124 @@
*/
public final class DataManagerFacade implements org.apache.activemq.kaha.impl.DataManager {
- private static class StoreLocationFacade implements StoreLocation {
- private final Location location;
+ private static class StoreLocationFacade implements StoreLocation {
+ private final Location location;
- public StoreLocationFacade(Location location) {
- this.location = location;
- }
-
- public int getFile() {
- return location.getDataFileId();
- }
-
- public long getOffset() {
- return location.getOffset();
- }
-
- public int getSize() {
- return location.getSize();
- }
-
- public Location getLocation() {
- return location;
- }
- }
-
- static private StoreLocation convertToStoreLocation(Location location) {
- if(location==null)
- return null;
- return new StoreLocationFacade(location);
- }
-
- static private Location convertFromStoreLocation(StoreLocation location) {
-
- if(location==null)
- return null;
-
- if( location.getClass()== StoreLocationFacade.class )
- return ((StoreLocationFacade)location).getLocation();
-
- Location l = new Location();
- l.setOffset((int) location.getOffset());
- l.setSize(location.getSize());
- l.setDataFileId(location.getFile());
- return l;
- }
-
- static final private ByteSequence FORCE_COMMAND = new ByteSequence(new byte[]{'F', 'O', 'R', 'C', 'E'});
-
- AsyncDataManager dataManager;
- private final String name;
- private Marshaller redoMarshaller;
-
-
- public DataManagerFacade(AsyncDataManager dataManager, String name) {
- this.dataManager=dataManager;
- this.name = name;
- }
-
- public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
- ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
- DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
+ public StoreLocationFacade(Location location) {
+ this.location = location;
+ }
+
+ public int getFile() {
+ return location.getDataFileId();
+ }
+
+ public long getOffset() {
+ return location.getOffset();
+ }
+
+ public int getSize() {
+ return location.getSize();
+ }
+
+ public Location getLocation() {
+ return location;
+ }
+ }
+
+ static private StoreLocation convertToStoreLocation(Location location) {
+ if (location == null)
+ return null;
+ return new StoreLocationFacade(location);
+ }
+
+ static private Location convertFromStoreLocation(StoreLocation location) {
+
+ if (location == null)
+ return null;
+
+ if (location.getClass() == StoreLocationFacade.class)
+ return ((StoreLocationFacade)location).getLocation();
+
+ Location l = new Location();
+ l.setOffset((int)location.getOffset());
+ l.setSize(location.getSize());
+ l.setDataFileId(location.getFile());
+ return l;
+ }
+
+ static final private ByteSequence FORCE_COMMAND = new ByteSequence(new byte[] {'F', 'O', 'R', 'C', 'E'});
+
+ AsyncDataManager dataManager;
+ private final String name;
+ private Marshaller redoMarshaller;
+
+ public DataManagerFacade(AsyncDataManager dataManager, String name) {
+ this.dataManager = dataManager;
+ this.name = name;
+ }
+
+ public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
+ ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
+ DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
return marshaller.readPayload(dataIn);
- }
+ }
+ public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
+ final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+ marshaller.writePayload(payload, buffer);
+ ByteSequence data = buffer.toByteSequence();
+ return convertToStoreLocation(dataManager.write(data, (byte)1, false));
+ }
+
+ public void force() throws IOException {
+ dataManager.write(FORCE_COMMAND, (byte)2, true);
+ }
+
+ public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
+ final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+ marshaller.writePayload(payload, buffer);
+ ByteSequence data = buffer.toByteSequence();
+ dataManager.update(convertFromStoreLocation(location), data, false);
+ }
+
+ public void close() throws IOException {
+ dataManager.close();
+ }
+
+ public void consolidateDataFiles() throws IOException {
+ dataManager.consolidateDataFiles();
+ }
+
+ public boolean delete() throws IOException {
+ return dataManager.delete();
+ }
+
+ public void addInterestInFile(int file) throws IOException {
+ dataManager.addInterestInFile(file);
+ }
+
+ public void removeInterestInFile(int file) throws IOException {
+ dataManager.removeInterestInFile(file);
+ }
+
+ public void recoverRedoItems(RedoListener listener) throws IOException {
+ throw new RuntimeException("Not Implemented..");
+ }
+
+ public StoreLocation storeRedoItem(Object payload) throws IOException {
+ throw new RuntimeException("Not Implemented..");
+ }
+
+ public Marshaller getRedoMarshaller() {
+ return redoMarshaller;
+ }
+
+ public void setRedoMarshaller(Marshaller redoMarshaller) {
+ this.redoMarshaller = redoMarshaller;
+ }
+
+ public String getName() {
+ return name;
+ }
- public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
- final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
- marshaller.writePayload(payload,buffer);
- ByteSequence data = buffer.toByteSequence();
- return convertToStoreLocation(dataManager.write(data, (byte)1, false));
- }
-
-
- public void force() throws IOException {
- dataManager.write(FORCE_COMMAND, (byte)2, true);
- }
-
- public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
- final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
- marshaller.writePayload(payload,buffer);
- ByteSequence data = buffer.toByteSequence();
- dataManager.update(convertFromStoreLocation(location), data, false);
- }
-
- public void close() throws IOException {
- dataManager.close();
- }
-
- public void consolidateDataFiles() throws IOException {
- dataManager.consolidateDataFiles();
- }
-
- public boolean delete() throws IOException {
- return dataManager.delete();
- }
-
- public void addInterestInFile(int file) throws IOException {
- dataManager.addInterestInFile(file);
- }
- public void removeInterestInFile(int file) throws IOException {
- dataManager.removeInterestInFile(file);
- }
-
- public void recoverRedoItems(RedoListener listener) throws IOException {
- throw new RuntimeException("Not Implemented..");
- }
- public StoreLocation storeRedoItem(Object payload) throws IOException {
- throw new RuntimeException("Not Implemented..");
- }
-
- public Marshaller getRedoMarshaller() {
- return redoMarshaller;
- }
- public void setRedoMarshaller(Marshaller redoMarshaller) {
- this.redoMarshaller = redoMarshaller;
- }
-
- public String getName() {
- return name;
- }
-
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java Wed Aug 8 11:56:59 2007
@@ -33,75 +33,77 @@
*/
public final class JournalFacade implements Journal {
-
- public static class RecordLocationFacade implements RecordLocation {
- private final Location location;
-
- public RecordLocationFacade(Location location) {
- this.location = location;
- }
-
- public Location getLocation() {
- return location;
- }
-
- public int compareTo(Object o) {
- RecordLocationFacade rlf = (RecordLocationFacade)o;
- int rc = location.compareTo(rlf.location);
- return rc;
- }
- }
-
- static private RecordLocation convertToRecordLocation(Location location) {
- if(location==null)
- return null;
- return new RecordLocationFacade(location);
- }
-
- static private Location convertFromRecordLocation(RecordLocation location) {
-
- if(location==null)
- return null;
-
- return ((RecordLocationFacade)location).getLocation();
- }
-
- AsyncDataManager dataManager;
-
- public JournalFacade(AsyncDataManager dataManager) {
- this.dataManager = dataManager;
- }
-
- public void close() throws IOException {
- dataManager.close();
- }
-
- public RecordLocation getMark() throws IllegalStateException {
- return convertToRecordLocation(dataManager.getMark());
- }
-
- public RecordLocation getNextRecordLocation(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
- return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location)));
- }
-
- public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
- ByteSequence rc = dataManager.read(convertFromRecordLocation(location));
- if( rc == null )
- return null;
- return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength());
- }
-
- public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException {
- }
-
- public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException, IOException, IllegalStateException {
- dataManager.setMark(convertFromRecordLocation(location), sync);
- }
-
- public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException {
- org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
- ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
- return convertToRecordLocation(dataManager.write(sequence, sync));
- }
-
+ public static class RecordLocationFacade implements RecordLocation {
+ private final Location location;
+
+ public RecordLocationFacade(Location location) {
+ this.location = location;
+ }
+
+ public Location getLocation() {
+ return location;
+ }
+
+ public int compareTo(Object o) {
+ RecordLocationFacade rlf = (RecordLocationFacade)o;
+ int rc = location.compareTo(rlf.location);
+ return rc;
+ }
+ }
+
+ static private RecordLocation convertToRecordLocation(Location location) {
+ if (location == null)
+ return null;
+ return new RecordLocationFacade(location);
+ }
+
+ static private Location convertFromRecordLocation(RecordLocation location) {
+
+ if (location == null)
+ return null;
+
+ return ((RecordLocationFacade)location).getLocation();
+ }
+
+ AsyncDataManager dataManager;
+
+ public JournalFacade(AsyncDataManager dataManager) {
+ this.dataManager = dataManager;
+ }
+
+ public void close() throws IOException {
+ dataManager.close();
+ }
+
+ public RecordLocation getMark() throws IllegalStateException {
+ return convertToRecordLocation(dataManager.getMark());
+ }
+
+ public RecordLocation getNextRecordLocation(RecordLocation location)
+ throws InvalidRecordLocationException, IOException, IllegalStateException {
+ return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location)));
+ }
+
+ public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException,
+ IllegalStateException {
+ ByteSequence rc = dataManager.read(convertFromRecordLocation(location));
+ if (rc == null)
+ return null;
+ return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength());
+ }
+
+ public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException {
+ }
+
+ public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException,
+ IOException, IllegalStateException {
+ dataManager.setMark(convertFromRecordLocation(location), sync);
+ }
+
+ public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException {
+ org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
+ ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
+ return convertToRecordLocation(dataManager.write(sequence, sync));
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java Wed Aug 8 11:56:59 2007
@@ -27,121 +27,124 @@
* @version $Revision: 1.2 $
*/
public final class Location implements Comparable<Location> {
-
- public static final byte MARK_TYPE=-1;
- public static final byte USER_TYPE=1;
- public static final byte NOT_SET_TYPE=0;
- public static final int NOT_SET=-1;
-
- private int dataFileId=NOT_SET;
- private int offset=NOT_SET;
- private int size=NOT_SET;
- private byte type=NOT_SET_TYPE;
+
+ public static final byte MARK_TYPE = -1;
+ public static final byte USER_TYPE = 1;
+ public static final byte NOT_SET_TYPE = 0;
+ public static final int NOT_SET = -1;
+
+ private int dataFileId = NOT_SET;
+ private int offset = NOT_SET;
+ private int size = NOT_SET;
+ private byte type = NOT_SET_TYPE;
private CountDownLatch latch;
- public Location(){}
-
+ public Location() {
+ }
+
Location(Location item) {
this.dataFileId = item.dataFileId;
this.offset = item.offset;
this.size = item.size;
this.type = item.type;
}
-
- boolean isValid(){
+
+ boolean isValid() {
return dataFileId != NOT_SET;
}
/**
* @return the size of the data record including the header.
*/
- public int getSize(){
+ public int getSize() {
return size;
}
/**
* @param size the size of the data record including the header.
*/
- public void setSize(int size){
- this.size=size;
+ public void setSize(int size) {
+ this.size = size;
}
/**
* @return the size of the payload of the record.
*/
public int getPaylodSize() {
- return size-AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
- }
-
- public int getOffset(){
+ return size - AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+ }
+
+ public int getOffset() {
return offset;
}
- public void setOffset(int offset){
- this.offset=offset;
+
+ public void setOffset(int offset) {
+ this.offset = offset;
}
- public int getDataFileId(){
+ public int getDataFileId() {
return dataFileId;
}
- public void setDataFileId(int file){
- this.dataFileId=file;
+ public void setDataFileId(int file) {
+ this.dataFileId = file;
}
- public byte getType() {
- return type;
- }
+ public byte getType() {
+ return type;
+ }
- public void setType(byte type) {
- this.type = type;
- }
+ public void setType(byte type) {
+ this.type = type;
+ }
- public String toString(){
- String result="offset = "+offset+", file = " + dataFileId + ", size = "+size + ", type = "+type;
+ public String toString() {
+ String result = "offset = " + offset + ", file = " + dataFileId + ", size = " + size + ", type = "
+ + type;
return result;
}
- public void writeExternal(DataOutput dos) throws IOException {
- dos.writeInt(dataFileId);
- dos.writeInt(offset);
- dos.writeInt(size);
- dos.writeByte(type);
- }
-
- public void readExternal(DataInput dis) throws IOException {
- dataFileId = dis.readInt();
- offset = dis.readInt();
- size = dis.readInt();
- type = dis.readByte();
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public int compareTo(Location o) {
- Location l = (Location)o;
- if( dataFileId == l.dataFileId ) {
- int rc = offset-l.offset;
- return rc;
- }
- return dataFileId - l.dataFileId;
- }
-
+ public void writeExternal(DataOutput dos) throws IOException {
+ dos.writeInt(dataFileId);
+ dos.writeInt(offset);
+ dos.writeInt(size);
+ dos.writeByte(type);
+ }
+
+ public void readExternal(DataInput dis) throws IOException {
+ dataFileId = dis.readInt();
+ offset = dis.readInt();
+ size = dis.readInt();
+ type = dis.readByte();
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public int compareTo(Location o) {
+ Location l = (Location)o;
+ if (dataFileId == l.dataFileId) {
+ int rc = offset - l.offset;
+ return rc;
+ }
+ return dataFileId - l.dataFileId;
+ }
+
public boolean equals(Object o) {
boolean result = false;
if (o instanceof Location) {
- result = compareTo((Location)o)==0;
+ result = compareTo((Location)o) == 0;
}
return result;
}
-
+
public int hashCode() {
return dataFileId ^ offset;
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Wed Aug 8 11:56:59 2007
@@ -22,191 +22,194 @@
import java.nio.channels.FileChannel;
/**
- * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more efficently
- * copy data to files.
+ * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more
+ * efficently copy data to files.
*
* @version $Revision: 1.1.1.1 $
*/
class NIODataFileAppender extends DataFileAppender {
-
+
public NIODataFileAppender(AsyncDataManager fileManager) {
- super(fileManager);
- }
+ super(fileManager);
+ }
- /**
- * The async processing loop that writes to the data files and
- * does the force calls.
+ /**
+ * The async processing loop that writes to the data files and does the
+ * force calls.
*
- * Since the file sync() call is the slowest of all the operations,
- * this algorithm tries to 'batch' or group together several file sync() requests
- * into a single file sync() call. The batching is accomplished attaching the
- * same CountDownLatch instance to every force request in a group.
+ * Since the file sync() call is the slowest of all the operations, this
+ * algorithm tries to 'batch' or group together several file sync() requests
+ * into a single file sync() call. The batching is accomplished attaching
+ * the same CountDownLatch instance to every force request in a group.
*
*/
protected void processQueue() {
- DataFile dataFile=null;
- RandomAccessFile file=null;
- FileChannel channel=null;
-
- try {
-
- ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
- ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
- ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE);
-
- // Populate the static parts of the headers and footers..
- header.putInt(0); // size
- header.put((byte) 0); // type
- header.put(RESERVED_SPACE); // reserved
- header.put(AsyncDataManager.ITEM_HEAD_SOR);
- footer.put(AsyncDataManager.ITEM_HEAD_EOR);
-
- while( true ) {
-
- Object o = null;
-
- // Block till we get a command.
- synchronized(enqueueMutex) {
- while( true ) {
- if( shutdown ) {
- o = SHUTDOWN_COMMAND;
- break;
- }
- if( nextWriteBatch!=null ) {
- o = nextWriteBatch;
- nextWriteBatch=null;
- break;
- }
- enqueueMutex.wait();
- }
- enqueueMutex.notify();
- }
-
-
- if( o == SHUTDOWN_COMMAND ) {
- break;
- }
-
- WriteBatch wb = (WriteBatch) o;
- if( dataFile != wb.dataFile ) {
- if( file!=null ) {
- dataFile.closeRandomAccessFile(file);
- }
- dataFile = wb.dataFile;
- file = dataFile.openRandomAccessFile(true);
- channel = file.getChannel();
- }
-
- WriteCommand write = wb.first;
-
- // Write all the data.
- // Only need to seek to first location.. all others
- // are in sequence.
- file.seek(write.location.getOffset());
-
- //
- // is it just 1 big write?
- if( wb.size == write.location.getSize() ) {
-
- header.clear();
- header.putInt(write.location.getSize());
- header.put(write.location.getType());
- header.clear();
- transfer(header, channel);
- ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
- transfer(source, channel);
- footer.clear();
- transfer(footer, channel);
-
- } else {
-
- // Combine the smaller writes into 1 big buffer
- while( write!=null ) {
-
- header.clear();
- header.putInt(write.location.getSize());
- header.put(write.location.getType());
- header.clear();
- copy(header, buffer);
- assert !header.hasRemaining();
-
- ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
- copy(source, buffer);
- assert !source.hasRemaining();
-
- footer.clear();
- copy(footer, buffer);
- assert !footer.hasRemaining();
-
- write = (WriteCommand) write.getNext();
- }
-
- // Fully write out the buffer..
- buffer.flip();
- transfer(buffer, channel);
- buffer.clear();
- }
-
- file.getChannel().force(false);
-
- WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
- dataManager.setLastAppendLocation( lastWrite.location );
-
- // Signal any waiting threads that the write is on disk.
- if( wb.latch!=null ) {
- wb.latch.countDown();
- }
-
- // Now that the data is on disk, remove the writes from the in flight
- // cache.
- write = wb.first;
- while( write!=null ) {
- if( !write.sync ) {
- inflightWrites.remove(new WriteKey(write.location));
- }
- write = (WriteCommand) write.getNext();
- }
- }
-
- } catch (IOException e) {
- synchronized( enqueueMutex ) {
- firstAsyncException = e;
- }
- } catch (InterruptedException e) {
- } finally {
- try {
- if( file!=null ) {
- dataFile.closeRandomAccessFile(file);
- }
- } catch (IOException e) {
- }
- shutdownDone.countDown();
- }
+ DataFile dataFile = null;
+ RandomAccessFile file = null;
+ FileChannel channel = null;
+
+ try {
+
+ ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
+ ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE);
+
+ // Populate the static parts of the headers and footers..
+ header.putInt(0); // size
+ header.put((byte)0); // type
+ header.put(RESERVED_SPACE); // reserved
+ header.put(AsyncDataManager.ITEM_HEAD_SOR);
+ footer.put(AsyncDataManager.ITEM_HEAD_EOR);
+
+ while (true) {
+
+ Object o = null;
+
+ // Block till we get a command.
+ synchronized (enqueueMutex) {
+ while (true) {
+ if (shutdown) {
+ o = SHUTDOWN_COMMAND;
+ break;
+ }
+ if (nextWriteBatch != null) {
+ o = nextWriteBatch;
+ nextWriteBatch = null;
+ break;
+ }
+ enqueueMutex.wait();
+ }
+ enqueueMutex.notify();
+ }
+
+ if (o == SHUTDOWN_COMMAND) {
+ break;
+ }
+
+ WriteBatch wb = (WriteBatch)o;
+ if (dataFile != wb.dataFile) {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ dataFile = wb.dataFile;
+ file = dataFile.openRandomAccessFile(true);
+ channel = file.getChannel();
+ }
+
+ WriteCommand write = wb.first;
+
+ // Write all the data.
+ // Only need to seek to first location.. all others
+ // are in sequence.
+ file.seek(write.location.getOffset());
+
+ //
+ // is it just 1 big write?
+ if (wb.size == write.location.getSize()) {
+
+ header.clear();
+ header.putInt(write.location.getSize());
+ header.put(write.location.getType());
+ header.clear();
+ transfer(header, channel);
+ ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+ write.data.getLength());
+ transfer(source, channel);
+ footer.clear();
+ transfer(footer, channel);
+
+ } else {
+
+ // Combine the smaller writes into 1 big buffer
+ while (write != null) {
+
+ header.clear();
+ header.putInt(write.location.getSize());
+ header.put(write.location.getType());
+ header.clear();
+ copy(header, buffer);
+ assert !header.hasRemaining();
+
+ ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+ write.data.getLength());
+ copy(source, buffer);
+ assert !source.hasRemaining();
+
+ footer.clear();
+ copy(footer, buffer);
+ assert !footer.hasRemaining();
+
+ write = (WriteCommand)write.getNext();
+ }
+
+ // Fully write out the buffer..
+ buffer.flip();
+ transfer(buffer, channel);
+ buffer.clear();
+ }
+
+ file.getChannel().force(false);
+
+ WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+ dataManager.setLastAppendLocation(lastWrite.location);
+
+ // Signal any waiting threads that the write is on disk.
+ if (wb.latch != null) {
+ wb.latch.countDown();
+ }
+
+ // Now that the data is on disk, remove the writes from the in
+ // flight
+ // cache.
+ write = wb.first;
+ while (write != null) {
+ if (!write.sync) {
+ inflightWrites.remove(new WriteKey(write.location));
+ }
+ write = (WriteCommand)write.getNext();
+ }
+ }
+
+ } catch (IOException e) {
+ synchronized (enqueueMutex) {
+ firstAsyncException = e;
+ }
+ } catch (InterruptedException e) {
+ } finally {
+ try {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ } catch (IOException e) {
+ }
+ shutdownDone.countDown();
+ }
}
/**
* Copy the bytes in header to the channel.
+ *
* @param header - source of data
* @param channel - destination where the data will be written.
* @throws IOException
*/
- private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
- while (header.hasRemaining()) {
- channel.write(header);
- }
- }
-
- private int copy(ByteBuffer src, ByteBuffer dest) {
- int rc = Math.min(dest.remaining(), src.remaining());
- if( rc > 0 ) {
- // Adjust our limit so that we don't overflow the dest buffer.
- int limit = src.limit();
- src.limit(src.position()+rc);
+ private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
+ while (header.hasRemaining()) {
+ channel.write(header);
+ }
+ }
+
+ private int copy(ByteBuffer src, ByteBuffer dest) {
+ int rc = Math.min(dest.remaining(), src.remaining());
+ if (rc > 0) {
+ // Adjust our limit so that we don't overflow the dest buffer.
+ int limit = src.limit();
+ src.limit(src.position() + rc);
dest.put(src);
// restore the limit.
- src.limit(limit);
- }
- return rc;
- }
-
+ src.limit(limit);
+ }
+ return rc;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySet.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySet.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySet.java Wed Aug 8 11:56:59 2007
@@ -24,85 +24,85 @@
import java.util.Set;
/**
-* Set of Map.Entry objects for a container
-*
-* @version $Revision: 1.2 $
-*/
-public class ContainerEntrySet extends ContainerCollectionSupport implements Set{
- ContainerEntrySet(MapContainerImpl container){
+ * Set of Map.Entry objects for a container
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ContainerEntrySet extends ContainerCollectionSupport implements Set {
+ ContainerEntrySet(MapContainerImpl container) {
super(container);
}
- public boolean contains(Object o){
+ public boolean contains(Object o) {
return container.entrySet().contains(o);
}
- public Iterator iterator(){
- return new ContainerEntrySetIterator(container,buildEntrySet().iterator());
+ public Iterator iterator() {
+ return new ContainerEntrySetIterator(container, buildEntrySet().iterator());
}
- public Object[] toArray(){
+ public Object[] toArray() {
return buildEntrySet().toArray();
}
- public Object[] toArray(Object[] a){
+ public Object[] toArray(Object[] a) {
return buildEntrySet().toArray(a);
}
- public boolean add(Object o){
+ public boolean add(Object o) {
throw new UnsupportedOperationException("Cannot add here");
}
- public boolean remove(Object o){
- boolean result=false;
- if(buildEntrySet().remove(o)){
- ContainerMapEntry entry=(ContainerMapEntry) o;
+ public boolean remove(Object o) {
+ boolean result = false;
+ if (buildEntrySet().remove(o)) {
+ ContainerMapEntry entry = (ContainerMapEntry)o;
container.remove(entry.getKey());
}
return result;
}
- public boolean containsAll(Collection c){
+ public boolean containsAll(Collection c) {
return buildEntrySet().containsAll(c);
}
- public boolean addAll(Collection c){
+ public boolean addAll(Collection c) {
throw new UnsupportedOperationException("Cannot add here");
}
- public boolean retainAll(Collection c){
- List tmpList=new ArrayList();
- for(Iterator i=c.iterator();i.hasNext();){
- Object o=i.next();
- if(!contains(o)){
+ public boolean retainAll(Collection c) {
+ List tmpList = new ArrayList();
+ for (Iterator i = c.iterator(); i.hasNext();) {
+ Object o = i.next();
+ if (!contains(o)) {
tmpList.add(o);
}
}
- boolean result=false;
- for(Iterator i=tmpList.iterator();i.hasNext();){
- result|=remove(i.next());
+ boolean result = false;
+ for (Iterator i = tmpList.iterator(); i.hasNext();) {
+ result |= remove(i.next());
}
return result;
}
- public boolean removeAll(Collection c){
- boolean result=true;
- for(Iterator i=c.iterator();i.hasNext();){
- if(!remove(i.next())){
- result=false;
+ public boolean removeAll(Collection c) {
+ boolean result = true;
+ for (Iterator i = c.iterator(); i.hasNext();) {
+ if (!remove(i.next())) {
+ result = false;
}
}
return result;
}
- public void clear(){
+ public void clear() {
container.clear();
}
- protected Set buildEntrySet(){
- Set set=new HashSet();
- for(Iterator i=container.keySet().iterator();i.hasNext();){
- ContainerMapEntry entry=new ContainerMapEntry(container,i.next());
+ protected Set buildEntrySet() {
+ Set set = new HashSet();
+ for (Iterator i = container.keySet().iterator(); i.hasNext();) {
+ ContainerMapEntry entry = new ContainerMapEntry(container, i.next());
set.add(entry);
}
return set;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySetIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySetIterator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySetIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerEntrySetIterator.java Wed Aug 8 11:56:59 2007
@@ -18,33 +18,33 @@
import java.util.Iterator;
-
/**
-* An Iterator for a container entry Set
-*
-* @version $Revision: 1.2 $
-*/
-public class ContainerEntrySetIterator implements Iterator{
+ * An Iterator for a container entry Set
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ContainerEntrySetIterator implements Iterator {
private MapContainerImpl container;
- private Iterator iter;
+ private Iterator iter;
private ContainerMapEntry currentEntry;
- ContainerEntrySetIterator(MapContainerImpl container,Iterator iter){
+
+ ContainerEntrySetIterator(MapContainerImpl container, Iterator iter) {
this.container = container;
this.iter = iter;
}
-
- public boolean hasNext(){
+
+ public boolean hasNext() {
return iter.hasNext();
}
- public Object next(){
- currentEntry = (ContainerMapEntry) iter.next();
+ public Object next() {
+ currentEntry = (ContainerMapEntry)iter.next();
return currentEntry;
}
- public void remove(){
- if (currentEntry != null){
- container.remove(currentEntry.getKey());
- }
+ public void remove() {
+ if (currentEntry != null) {
+ container.remove(currentEntry.getKey());
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java Wed Aug 8 11:56:59 2007
@@ -25,27 +25,25 @@
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
/**
-* A Set of keys for the container
-*
-* @version $Revision: 1.2 $
-*/
-public class ContainerKeySet extends ContainerCollectionSupport implements Set{
-
-
- ContainerKeySet(MapContainerImpl container){
+ * A Set of keys for the container
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ContainerKeySet extends ContainerCollectionSupport implements Set {
+
+ ContainerKeySet(MapContainerImpl container) {
super(container);
}
-
-
- public boolean contains(Object o){
+
+ public boolean contains(Object o) {
return container.containsKey(o);
}
- public Iterator iterator(){
+ public Iterator iterator() {
return new ContainerKeySetIterator(container);
}
- public Object[] toArray(){
+ public Object[] toArray() {
List list = new ArrayList();
IndexItem item = container.getInternalList().getRoot();
while ((item = container.getInternalList().getNextEntry(item)) != null) {
@@ -54,7 +52,7 @@
return list.toArray();
}
- public Object[] toArray(Object[] a){
+ public Object[] toArray(Object[] a) {
List list = new ArrayList();
IndexItem item = container.getInternalList().getRoot();
while ((item = container.getInternalList().getNextEntry(item)) != null) {
@@ -63,58 +61,58 @@
return list.toArray(a);
}
- public boolean add(Object o){
+ public boolean add(Object o) {
throw new UnsupportedOperationException("Cannot add here");
}
- public boolean remove(Object o){
- return container.remove(o) != null;
+ public boolean remove(Object o) {
+ return container.remove(o) != null;
}
- public boolean containsAll(Collection c){
+ public boolean containsAll(Collection c) {
boolean result = true;
- for (Object key:c) {
- if (!(result&=container.containsKey(key))) {
+ for (Object key : c) {
+ if (!(result &= container.containsKey(key))) {
break;
}
}
- return result;
+ return result;
}
- public boolean addAll(Collection c){
+ public boolean addAll(Collection c) {
throw new UnsupportedOperationException("Cannot add here");
}
- public boolean retainAll(Collection c){
+ public boolean retainAll(Collection c) {
List tmpList = new ArrayList();
- for (Iterator i = c.iterator(); i.hasNext(); ){
+ for (Iterator i = c.iterator(); i.hasNext();) {
Object o = i.next();
- if (!contains(o)){
+ if (!contains(o)) {
tmpList.add(o);
- }
+ }
}
- for(Iterator i = tmpList.iterator(); i.hasNext();){
+ for (Iterator i = tmpList.iterator(); i.hasNext();) {
remove(i.next());
}
return !tmpList.isEmpty();
}
- public boolean removeAll(Collection c){
+ public boolean removeAll(Collection c) {
boolean result = true;
- for (Iterator i = c.iterator(); i.hasNext(); ){
- if (!remove(i.next())){
+ for (Iterator i = c.iterator(); i.hasNext();) {
+ if (!remove(i.next())) {
result = false;
}
}
return result;
}
- public void clear(){
- container.clear();
+ public void clear() {
+ container.clear();
}
-
+
public String toString() {
- StringBuffer result =new StringBuffer(32);
+ StringBuffer result = new StringBuffer(32);
result.append("ContainerKeySet[");
IndexItem item = container.getInternalList().getRoot();
while ((item = container.getInternalList().getNextEntry(item)) != null) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java Wed Aug 8 11:56:59 2007
@@ -20,38 +20,37 @@
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
-
/**
-*Iterator for the set of keys for a container
-*
-* @version $Revision: 1.2 $
-*/
-public class ContainerKeySetIterator implements Iterator{
+ * Iterator for the set of keys for a container
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ContainerKeySetIterator implements Iterator {
private MapContainerImpl container;
private IndexLinkedList list;
protected IndexItem nextItem;
protected IndexItem currentItem;
-
- ContainerKeySetIterator(MapContainerImpl container){
+
+ ContainerKeySetIterator(MapContainerImpl container) {
this.container = container;
- this.list=container.getInternalList();
- this.currentItem=list.getRoot();
- this.nextItem=list.getNextEntry(currentItem);
+ this.list = container.getInternalList();
+ this.currentItem = list.getRoot();
+ this.nextItem = list.getNextEntry(currentItem);
}
-
- public boolean hasNext(){
- return nextItem!=null;
+
+ public boolean hasNext() {
+ return nextItem != null;
}
- public Object next(){
- currentItem=nextItem;
- Object result=container.getKey(nextItem);
- nextItem=list.getNextEntry(nextItem);
+ public Object next() {
+ currentItem = nextItem;
+ Object result = container.getKey(nextItem);
+ nextItem = list.getNextEntry(nextItem);
return result;
}
- public void remove(){
- if(currentItem!=null){
+ public void remove() {
+ if (currentItem != null) {
container.remove(currentItem);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerListIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerListIterator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerListIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerListIterator.java Wed Aug 8 11:56:59 2007
@@ -22,10 +22,10 @@
/**
* @version $Revision: 1.2 $
*/
-public class ContainerListIterator extends ContainerValueCollectionIterator implements ListIterator{
+public class ContainerListIterator extends ContainerValueCollectionIterator implements ListIterator {
- protected ContainerListIterator(ListContainerImpl container,IndexLinkedList list,IndexItem start){
- super(container,list,start);
+ protected ContainerListIterator(ListContainerImpl container, IndexLinkedList list, IndexItem start) {
+ super(container, list, start);
}
/*
@@ -33,10 +33,10 @@
*
* @see java.util.ListIterator#hasPrevious()
*/
- public boolean hasPrevious(){
- synchronized(container){
- nextItem=(IndexItem)list.refreshEntry(nextItem);
- return list.getPrevEntry(nextItem)!=null;
+ public boolean hasPrevious() {
+ synchronized (container) {
+ nextItem = (IndexItem)list.refreshEntry(nextItem);
+ return list.getPrevEntry(nextItem) != null;
}
}
@@ -45,11 +45,11 @@
*
* @see java.util.ListIterator#previous()
*/
- public Object previous(){
- synchronized(container){
- nextItem=(IndexItem)list.refreshEntry(nextItem);
- nextItem=list.getPrevEntry(nextItem);
- return nextItem!=null?container.getValue(nextItem):null;
+ public Object previous() {
+ synchronized (container) {
+ nextItem = (IndexItem)list.refreshEntry(nextItem);
+ nextItem = list.getPrevEntry(nextItem);
+ return nextItem != null ? container.getValue(nextItem) : null;
}
}
@@ -58,14 +58,14 @@
*
* @see java.util.ListIterator#nextIndex()
*/
- public int nextIndex(){
- int result=-1;
- if(nextItem!=null){
- synchronized(container){
- nextItem=(IndexItem)list.refreshEntry(nextItem);
- StoreEntry next=list.getNextEntry(nextItem);
- if(next!=null){
- result=container.getInternalList().indexOf(next);
+ public int nextIndex() {
+ int result = -1;
+ if (nextItem != null) {
+ synchronized (container) {
+ nextItem = (IndexItem)list.refreshEntry(nextItem);
+ StoreEntry next = list.getNextEntry(nextItem);
+ if (next != null) {
+ result = container.getInternalList().indexOf(next);
}
}
}
@@ -77,14 +77,14 @@
*
* @see java.util.ListIterator#previousIndex()
*/
- public int previousIndex(){
- int result=-1;
- if(nextItem!=null){
- synchronized(container){
- nextItem=(IndexItem)list.refreshEntry(nextItem);
- StoreEntry prev=list.getPrevEntry(nextItem);
- if(prev!=null){
- result=container.getInternalList().indexOf(prev);
+ public int previousIndex() {
+ int result = -1;
+ if (nextItem != null) {
+ synchronized (container) {
+ nextItem = (IndexItem)list.refreshEntry(nextItem);
+ StoreEntry prev = list.getPrevEntry(nextItem);
+ if (prev != null) {
+ result = container.getInternalList().indexOf(prev);
}
}
}
@@ -96,9 +96,9 @@
*
* @see java.util.ListIterator#set(E)
*/
- public void set(Object o){
- IndexItem item=((ListContainerImpl)container).internalSet(previousIndex()+1,o);
- nextItem=item;
+ public void set(Object o) {
+ IndexItem item = ((ListContainerImpl)container).internalSet(previousIndex() + 1, o);
+ nextItem = item;
}
/*
@@ -106,8 +106,8 @@
*
* @see java.util.ListIterator#add(E)
*/
- public void add(Object o){
- IndexItem item=((ListContainerImpl)container).internalAdd(previousIndex()+1,o);
- nextItem=item;
+ public void add(Object o) {
+ IndexItem item = ((ListContainerImpl)container).internalAdd(previousIndex() + 1, o);
+ nextItem = item;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerMapEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerMapEntry.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerMapEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerMapEntry.java Wed Aug 8 11:56:59 2007
@@ -20,34 +20,30 @@
import org.apache.activemq.kaha.MapContainer;
/**
-* Map.Entry implementation for a container
-*
-* @version $Revision: 1.2 $
-*/
+ * Map.Entry implementation for a container
+ *
+ * @version $Revision: 1.2 $
+ */
class ContainerMapEntry implements Map.Entry {
private MapContainer container;
private Object key;
-
- ContainerMapEntry(MapContainer container,Object key){
+
+ ContainerMapEntry(MapContainer container, Object key) {
this.container = container;
this.key = key;
-
+
}
-
-
- public Object getKey(){
+
+ public Object getKey() {
return key;
}
- public Object getValue(){
+ public Object getValue() {
return container.get(key);
}
- public Object setValue(Object value){
+ public Object setValue(Object value) {
return container.put(key, value);
}
}
-
-
-
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollection.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollection.java Wed Aug 8 11:56:59 2007
@@ -26,59 +26,53 @@
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
/**
-* Values collection for the MapContainer
-*
-* @version $Revision: 1.2 $
-*/
-class ContainerValueCollection extends ContainerCollectionSupport implements Collection{
-
-
- ContainerValueCollection(MapContainerImpl container){
+ * Values collection for the MapContainer
+ *
+ * @version $Revision: 1.2 $
+ */
+class ContainerValueCollection extends ContainerCollectionSupport implements Collection {
+
+ ContainerValueCollection(MapContainerImpl container) {
super(container);
}
-
-
-
- public boolean contains(Object o){
+
+ public boolean contains(Object o) {
return container.containsValue(o);
}
-
- public Iterator iterator(){
- IndexLinkedList list=container.getItemList();
- return new ContainerValueCollectionIterator(container,list,list.getRoot());
+ public Iterator iterator() {
+ IndexLinkedList list = container.getItemList();
+ return new ContainerValueCollectionIterator(container, list, list.getRoot());
}
-
- public Object[] toArray(){
+ public Object[] toArray() {
Object[] result = null;
IndexLinkedList list = container.getItemList();
- synchronized(list){
+ synchronized (list) {
result = new Object[list.size()];
IndexItem item = list.getFirst();
int count = 0;
- while (item != null){
- Object value=container.getValue(item);
+ while (item != null) {
+ Object value = container.getValue(item);
result[count++] = value;
-
+
item = list.getNextEntry(item);
}
-
-
+
}
return result;
}
- public Object[] toArray(Object[] result){
- IndexLinkedList list=container.getItemList();
- synchronized(list){
- if(result.length<=list.size()){
+ public Object[] toArray(Object[] result) {
+ IndexLinkedList list = container.getItemList();
+ synchronized (list) {
+ if (result.length <= list.size()) {
IndexItem item = list.getFirst();
int count = 0;
- while (item != null){
- Object value=container.getValue(item);
+ while (item != null) {
+ Object value = container.getValue(item);
result[count++] = value;
-
+
item = list.getNextEntry(item);
}
}
@@ -86,21 +80,18 @@
return result;
}
-
- public boolean add(Object o){
- throw new UnsupportedOperationException("Can't add an object here");
+ public boolean add(Object o) {
+ throw new UnsupportedOperationException("Can't add an object here");
}
-
- public boolean remove(Object o){
+ public boolean remove(Object o) {
return container.removeValue(o);
}
-
- public boolean containsAll(Collection c){
+ public boolean containsAll(Collection c) {
boolean result = !c.isEmpty();
- for (Iterator i = c.iterator(); i.hasNext(); ){
- if (!contains(i.next())){
+ for (Iterator i = c.iterator(); i.hasNext();) {
+ if (!contains(i.next())) {
result = false;
break;
}
@@ -108,38 +99,34 @@
return result;
}
-
- public boolean addAll(Collection c){
- throw new UnsupportedOperationException("Can't add everything here!");
+ public boolean addAll(Collection c) {
+ throw new UnsupportedOperationException("Can't add everything here!");
}
-
- public boolean removeAll(Collection c){
+ public boolean removeAll(Collection c) {
boolean result = true;
- for (Iterator i = c.iterator(); i.hasNext(); ){
+ for (Iterator i = c.iterator(); i.hasNext();) {
Object obj = i.next();
- result&=remove(obj);
+ result &= remove(obj);
}
return result;
}
-
- public boolean retainAll(Collection c){
- List tmpList = new ArrayList();
- for (Iterator i = c.iterator(); i.hasNext(); ){
- Object o = i.next();
- if (!contains(o)){
- tmpList.add(o);
- }
- }
- for(Iterator i = tmpList.iterator(); i.hasNext();){
- remove(i.next());
- }
- return !tmpList.isEmpty();
- }
-
-
- public void clear(){
- container.clear();
+ public boolean retainAll(Collection c) {
+ List tmpList = new ArrayList();
+ for (Iterator i = c.iterator(); i.hasNext();) {
+ Object o = i.next();
+ if (!contains(o)) {
+ tmpList.add(o);
+ }
+ }
+ for (Iterator i = tmpList.iterator(); i.hasNext();) {
+ remove(i.next());
+ }
+ return !tmpList.isEmpty();
+ }
+
+ public void clear() {
+ container.clear();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollectionIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollectionIterator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollectionIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerValueCollectionIterator.java Wed Aug 8 11:56:59 2007
@@ -23,38 +23,38 @@
*
* @version $Revision: 1.2 $
*/
-public class ContainerValueCollectionIterator implements Iterator{
+public class ContainerValueCollectionIterator implements Iterator {
protected BaseContainerImpl container;
protected IndexLinkedList list;
protected IndexItem nextItem;
protected IndexItem currentItem;
- ContainerValueCollectionIterator(BaseContainerImpl container,IndexLinkedList list,IndexItem start){
- this.container=container;
- this.list=list;
- this.currentItem=start;
- this.nextItem=list.getNextEntry((IndexItem)list.refreshEntry(start));
+ ContainerValueCollectionIterator(BaseContainerImpl container, IndexLinkedList list, IndexItem start) {
+ this.container = container;
+ this.list = list;
+ this.currentItem = start;
+ this.nextItem = list.getNextEntry((IndexItem)list.refreshEntry(start));
}
- public boolean hasNext(){
- return nextItem!=null;
+ public boolean hasNext() {
+ return nextItem != null;
}
- public Object next(){
- synchronized(container){
- nextItem=(IndexItem)list.refreshEntry(nextItem);
- currentItem=nextItem;
- Object result=container.getValue(nextItem);
- nextItem=list.getNextEntry(nextItem);
+ public Object next() {
+ synchronized (container) {
+ nextItem = (IndexItem)list.refreshEntry(nextItem);
+ currentItem = nextItem;
+ Object result = container.getValue(nextItem);
+ nextItem = list.getNextEntry(nextItem);
return result;
}
}
- public void remove(){
- synchronized(container){
- if(currentItem!=null){
- currentItem=(IndexItem)list.refreshEntry(currentItem);
+ public void remove() {
+ synchronized (container) {
+ if (currentItem != null) {
+ currentItem = (IndexItem)list.refreshEntry(currentItem);
container.remove(currentItem);
}
}