You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/02/06 23:24:58 UTC
svn commit: r1241221 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/store/kahadb/
kahadb/src/main/java/org/apache/kahadb/journal/
kahadb/src/main/java/org/apache/kahadb/page/
kahadb/src/main/java/org/apache/kahadb/util/ kahadb/sr...
Author: tabish
Date: Mon Feb 6 22:24:58 2012
New Revision: 1241221
URL: http://svn.apache.org/viewvc?rev=1241221&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3702
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Feb 6 22:24:58 2012
@@ -55,7 +55,7 @@ public abstract class MessageDatabase ex
protected BrokerService brokerService;
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
- public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
+ public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
public static final File DEFAULT_DIRECTORY = new File("KahaDB");
protected static final Buffer UNMATCHED;
static {
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java Mon Feb 6 22:24:58 2012
@@ -25,7 +25,7 @@ import java.util.Map;
/**
* Used to pool DataFileAccessors.
- *
+ *
* @author chirino
*/
public class DataFileAccessorPool {
@@ -95,8 +95,7 @@ public class DataFileAccessorPool {
}
synchronized void clearUsedMark() {
- for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
- Pool pool = iter.next();
+ for (Pool pool : pools.values()) {
pool.clearUsedMark();
}
}
@@ -153,8 +152,7 @@ public class DataFileAccessorPool {
return;
}
closed = true;
- for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
- Pool pool = iter.next();
+ for (Pool pool : pools.values()) {
pool.dispose();
}
pools.clear();
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Mon Feb 6 22:24:58 2012
@@ -28,20 +28,21 @@ import java.util.zip.Checksum;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LinkedNodeList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An optimized writer to do batch appends to a data file. This object is thread
* safe and gains throughput as you increase the number of concurrent writes it
* does.
- *
- *
*/
class DataFileAppender implements FileAppender {
+ private static final Logger logger = LoggerFactory.getLogger(DataFileAppender.class);
+
protected final Journal journal;
protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
- protected final Object enqueueMutex = new Object() {
- };
+ protected final Object enqueueMutex = new Object();
protected WriteBatch nextWriteBatch;
protected boolean shutdown;
@@ -90,7 +91,7 @@ class DataFileAppender implements FileAp
public WriteBatch(DataFile dataFile,int offset) {
this.dataFile = dataFile;
- this.offset = offset;
+ this.offset = offset;
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
@@ -103,7 +104,7 @@ class DataFileAppender implements FileAp
public boolean canAppend(Journal.WriteCommand write) {
int newSize = size + write.location.getSize();
- if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
+ if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
return false;
}
return true;
@@ -114,7 +115,7 @@ class DataFileAppender implements FileAp
write.location.setDataFileId(dataFile.getDataFileId());
write.location.setOffset(offset+size);
int s = write.location.getSize();
- size += s;
+ size += s;
dataFile.incrementLength(s);
journal.addToTotalLength(s);
}
@@ -131,7 +132,7 @@ class DataFileAppender implements FileAp
}
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
-
+
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
@@ -149,11 +150,11 @@ class DataFileAppender implements FileAp
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
- IOException exception = batch.exception.get();
+ IOException exception = batch.exception.get();
if (exception != null) {
- throw exception;
+ throw exception;
}
- }
+ }
return location;
}
@@ -169,7 +170,7 @@ class DataFileAppender implements FileAp
Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
WriteBatch batch = enqueue(write);
-
+
location.setLatch(batch.latch);
return location;
}
@@ -179,7 +180,7 @@ class DataFileAppender implements FileAp
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
-
+
if (!running) {
running = true;
thread = new Thread() {
@@ -193,44 +194,45 @@ class DataFileAppender implements FileAp
thread.start();
firstAsyncException = null;
}
-
+
if (firstAsyncException != null) {
throw firstAsyncException;
}
while ( true ) {
- if (nextWriteBatch == null) {
- DataFile file = journal.getCurrentWriteFile();
- if( file.getLength() > journal.getMaxFileLength() ) {
- file = journal.rotateWriteFile();
- }
-
- nextWriteBatch = newWriteBatch(write, file);
- enqueueMutex.notifyAll();
- break;
- } else {
- // Append to current batch if possible..
- if (nextWriteBatch.canAppend(write)) {
- nextWriteBatch.append(write);
- break;
- } else {
- // Otherwise wait for the queuedCommand to be null
- try {
- while (nextWriteBatch != null) {
- final long start = System.currentTimeMillis();
- enqueueMutex.wait();
- if (maxStat > 0) {
- System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start));
- }
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- if (shutdown) {
- throw new IOException("Async Writter Thread Shutdown");
- }
- }
- }
+ if (nextWriteBatch == null) {
+ DataFile file = journal.getCurrentWriteFile();
+ if( file.getLength() > journal.getMaxFileLength() ) {
+ file = journal.rotateWriteFile();
+ }
+
+ nextWriteBatch = newWriteBatch(write, file);
+ enqueueMutex.notifyAll();
+ break;
+ } else {
+ // Append to current batch if possible..
+ if (nextWriteBatch.canAppend(write)) {
+ nextWriteBatch.append(write);
+ break;
+ } else {
+ // Otherwise wait for the queuedCommand to be null
+ try {
+ while (nextWriteBatch != null) {
+ final long start = System.currentTimeMillis();
+ enqueueMutex.wait();
+ if (maxStat > 0) {
+ logger.info("Watiting for write to finish with full batch... millis: " +
+ (System.currentTimeMillis() - start));
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ if (shutdown) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+ }
+ }
}
if (!write.sync) {
inflightWrites.put(new Journal.WriteKey(write.location), write);
@@ -282,13 +284,11 @@ class DataFileAppender implements FileAp
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
while (true) {
- Object o = null;
-
// Block till we get a command.
synchronized (enqueueMutex) {
while (true) {
if (nextWriteBatch != null) {
- o = nextWriteBatch;
+ wb = nextWriteBatch;
nextWriteBatch = null;
break;
}
@@ -300,7 +300,6 @@ class DataFileAppender implements FileAp
enqueueMutex.notifyAll();
}
- wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
file.setLength(dataFile.getLength());
@@ -333,15 +332,15 @@ class DataFileAppender implements FileAp
}
ByteSequence sequence = buff.toByteSequence();
-
- // Now we can fill in the batch control record properly.
+
+ // Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
if( journal.isChecksum() ) {
- Checksum checksum = new Adler32();
- checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
- buff.writeLong(checksum.getValue());
+ Checksum checksum = new Adler32();
+ checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+ buff.writeLong(checksum.getValue());
}
// Now do the 1 big write.
@@ -354,16 +353,16 @@ class DataFileAppender implements FileAp
for (;statIdx > 0;) {
all+= stats[--statIdx];
}
- System.err.println("Ave writeSize: " + all/maxStat);
+ logger.info("Ave writeSize: " + all/maxStat);
}
}
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
-
+
ReplicationTarget replicationTarget = journal.getReplicationTarget();
if( replicationTarget!=null ) {
- replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
+ replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
}
-
+
if (forceToDisk) {
file.getFD().sync();
}
@@ -411,7 +410,7 @@ class DataFileAppender implements FileAp
try {
write.onComplete.run();
} catch (Throwable e) {
- e.printStackTrace();
+ logger.info("Add exception was raised while executing the run command for onComplete", e);
}
}
write = write.getNext();
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Mon Feb 6 22:24:58 2012
@@ -48,8 +48,8 @@ import org.apache.kahadb.util.Sequence;
/**
* Manages DataFiles
- *
- *
+ *
+ *
*/
public class Journal {
public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
@@ -57,12 +57,12 @@ public class Journal {
private static final int MAX_BATCH_SIZE = 32*1024*1024;
- // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+ // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
public static final int RECORD_HEAD_SPACE = 4 + 1;
-
+
public static final byte USER_RECORD_TYPE = 1;
public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
- // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
+ // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
@@ -77,7 +77,7 @@ public class Journal {
sequence.compact();
return sequence.getData();
} catch (IOException e) {
- throw new RuntimeException("Could not create batch control record header.");
+ throw new RuntimeException("Could not create batch control record header.", e);
}
}
@@ -89,7 +89,7 @@ public class Journal {
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
public static final int PREFERED_DIFF = 1024 * 512;
public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
-
+
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
@@ -99,11 +99,11 @@ public class Journal {
protected String filePrefix = DEFAULT_FILE_PREFIX;
protected String fileSuffix = DEFAULT_FILE_SUFFIX;
protected boolean started;
-
+
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
-
+
protected FileAppender appender;
protected DataFileAccessorPool accessorPool;
@@ -115,18 +115,17 @@ public class Journal {
protected Runnable cleanupTask;
protected AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs;
- private ReplicationTarget replicationTarget;
+ private ReplicationTarget replicationTarget;
protected boolean checksum;
protected boolean checkForCorruptionOnStartup;
protected boolean enableAsyncDiskSync = true;
private Timer timer;
-
public synchronized void start() throws IOException {
if (started) {
return;
}
-
+
long start = System.currentTimeMillis();
accessorPool = new DataFileAccessorPool(this);
started = true;
@@ -141,9 +140,8 @@ public class Journal {
});
if (files != null) {
- for (int i = 0; i < files.length; i++) {
+ for (File file : files) {
try {
- File file = files[i];
String n = file.getName();
String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
int num = Integer.parseInt(numStr);
@@ -174,7 +172,7 @@ public class Journal {
}
}
- getCurrentWriteFile();
+ getCurrentWriteFile();
if( lastAppendLocation.get()==null ) {
DataFile df = dataFiles.getTail();
@@ -194,19 +192,19 @@ public class Journal {
}
private static byte[] bytes(String string) {
- try {
- return string.getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
+ try {
+ return string.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
- protected Location recoveryCheck(DataFile dataFile) throws IOException {
+ protected Location recoveryCheck(DataFile dataFile) throws IOException {
Location location = new Location();
location.setDataFileId(dataFile.getDataFileId());
location.setOffset(0);
- DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
while( true ) {
int size = checkBatchRecord(reader, location.getOffset());
@@ -227,9 +225,9 @@ public class Journal {
}
}
}
-
+
} catch (IOException e) {
- } finally {
+ } finally {
accessorPool.closeDataFileAccessor(reader);
}
@@ -315,14 +313,14 @@ public class Journal {
}
- void addToTotalLength(int size) {
- totalLength.addAndGet(size);
- }
+ void addToTotalLength(int size) {
+ totalLength.addAndGet(size);
+ }
public long length() {
return totalLength.get();
}
-
+
synchronized DataFile getCurrentWriteFile() throws IOException {
if (dataFiles.isEmpty()) {
rotateWriteFile();
@@ -331,21 +329,21 @@ public class Journal {
}
synchronized DataFile rotateWriteFile() {
- int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
- File file = getFile(nextNum);
- DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
- // actually allocate the disk space
- fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
- fileByFileMap.put(file, nextWriteFile);
- dataFiles.addLast(nextWriteFile);
- return nextWriteFile;
- }
-
- public File getFile(int nextNum) {
- String fileName = filePrefix + nextNum + fileSuffix;
- File file = new File(directory, fileName);
- return file;
- }
+ int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+ File file = getFile(nextNum);
+ DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
+ // actually allocate the disk space
+ fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+ fileByFileMap.put(file, nextWriteFile);
+ dataFiles.addLast(nextWriteFile);
+ return nextWriteFile;
+ }
+
+ public File getFile(int nextNum) {
+ String fileName = filePrefix + nextNum + fileSuffix;
+ File file = new File(directory, fileName);
+ return file;
+ }
synchronized DataFile getDataFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
@@ -419,12 +417,12 @@ public class Journal {
public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
for (Integer key : files) {
// Can't remove the data file (or subsequent files) that is currently being written to.
- if( key >= lastAppendLocation.get().getDataFileId() ) {
- continue;
- }
+ if( key >= lastAppendLocation.get().getDataFileId() ) {
+ continue;
+ }
DataFile dataFile = fileMap.get(key);
if( dataFile!=null ) {
- forceRemoveDataFile(dataFile);
+ forceRemoveDataFile(dataFile);
}
}
}
@@ -440,9 +438,9 @@ public class Journal {
LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
} else {
if ( dataFile.delete() ) {
- LOG.debug("Discarded data file " + dataFile);
+ LOG.debug("Discarded data file " + dataFile);
} else {
- LOG.warn("Failed to discard data file " + dataFile.getFile());
+ LOG.warn("Failed to discard data file " + dataFile.getFile());
}
}
}
@@ -466,14 +464,14 @@ public class Journal {
return directory.toString();
}
- public synchronized void appendedExternally(Location loc, int length) throws IOException {
- DataFile dataFile = null;
- if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
- // It's an update to the current log file..
- dataFile = dataFiles.getTail();
- dataFile.incrementLength(length);
- } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
- // It's an update to the next log file.
+ public synchronized void appendedExternally(Location loc, int length) throws IOException {
+ DataFile dataFile = null;
+ if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
+ // It's an update to the current log file..
+ dataFile = dataFiles.getTail();
+ dataFile.incrementLength(length);
+ } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
+ // It's an update to the next log file.
int nextNum = loc.getDataFileId();
File file = getFile(nextNum);
dataFile = new DataFile(file, nextNum, preferedFileLength);
@@ -481,10 +479,10 @@ public class Journal {
fileMap.put(dataFile.getDataFileId(), dataFile);
fileByFileMap.put(file, dataFile);
dataFiles.addLast(dataFile);
- } else {
- throw new IOException("Invalid external append.");
- }
- }
+ } else {
+ throw new IOException("Invalid external append.");
+ }
+ }
public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
@@ -494,7 +492,7 @@ public class Journal {
if (location == null) {
DataFile head = dataFiles.getHead();
if( head == null ) {
- return null;
+ return null;
}
cur = new Location();
cur.setDataFileId(head.getDataFileId());
@@ -528,7 +526,7 @@ public class Journal {
// Load in location size and type.
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
- reader.readLocationDetails(cur);
+ reader.readLocationDetails(cur);
} finally {
accessorPool.closeDataFileAccessor(reader);
}
@@ -682,7 +680,7 @@ public class Journal {
/**
* Get a set of files - only valid after start()
- *
+ *
* @return files currently being used
*/
public Set<File> getFiles() {
@@ -692,7 +690,7 @@ public class Journal {
public synchronized Map<Integer, DataFile> getFileMap() {
return new TreeMap<Integer, DataFile>(fileMap);
}
-
+
public long getDiskSize() {
long tailLength=0;
synchronized( this ) {
@@ -700,9 +698,9 @@ public class Journal {
tailLength = dataFiles.getTail().getLength();
}
}
-
+
long rc = totalLength.get();
-
+
// The last file is actually at a minimum preferedFileLength big.
if( tailLength < preferedFileLength ) {
rc -= tailLength;
@@ -711,12 +709,12 @@ public class Journal {
return rc;
}
- public void setReplicationTarget(ReplicationTarget replicationTarget) {
- this.replicationTarget = replicationTarget;
- }
- public ReplicationTarget getReplicationTarget() {
- return replicationTarget;
- }
+ public void setReplicationTarget(ReplicationTarget replicationTarget) {
+ this.replicationTarget = replicationTarget;
+ }
+ public ReplicationTarget getReplicationTarget() {
+ return replicationTarget;
+ }
public String getFileSuffix() {
return fileSuffix;
@@ -726,13 +724,13 @@ public class Journal {
this.fileSuffix = fileSuffix;
}
- public boolean isChecksum() {
- return checksum;
- }
-
- public void setChecksum(boolean checksumWrites) {
- this.checksum = checksumWrites;
- }
+ public boolean isChecksum() {
+ return checksum;
+ }
+
+ public void setChecksum(boolean checksumWrites) {
+ this.checksum = checksumWrites;
+ }
public boolean isCheckForCorruptionOnStartup() {
return checkForCorruptionOnStartup;
@@ -745,7 +743,7 @@ public class Journal {
public void setWriteBatchSize(int writeBatchSize) {
this.writeBatchSize = writeBatchSize;
}
-
+
public int getWriteBatchSize() {
return writeBatchSize;
}
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java Mon Feb 6 22:24:58 2012
@@ -19,19 +19,9 @@ package org.apache.kahadb.page;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.Marshaller;
/**
* A Page within a file.
- *
- *
*/
public class Page<T> {
@@ -48,7 +38,7 @@ public class Page<T> {
long txId;
// A field reserved to hold checksums.. Not in use (yet)
int checksum;
-
+
// Points to the next page in the chunk stream
long next;
T data;
@@ -60,18 +50,17 @@ public class Page<T> {
this.pageId=pageId;
}
- public void copy(Page<T> other) {
+ public Page<T> copy(Page<T> other) {
this.pageId = other.pageId;
this.txId = other.txId;
this.type = other.type;
this.next = other.next;
this.data = other.data;
+ return this;
}
Page<T> copy() {
- Page<T> rc = new Page<T>();
- rc.copy(this);
- return rc;
+ return new Page<T>().copy(this);
}
void makeFree(long txId) {
@@ -80,13 +69,13 @@ public class Page<T> {
this.data = null;
this.next = 0;
}
-
+
public void makePagePart(long next, long txId) {
this.type = Page.PAGE_PART_TYPE;
this.next = next;
this.txId = txId;
}
-
+
public void makePageEnd(long size, long txId) {
this.type = Page.PAGE_END_TYPE;
this.next = size;
@@ -142,6 +131,4 @@ public class Page<T> {
public void setChecksum(int checksum) {
this.checksum = checksum;
}
-
-
}
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Mon Feb 6 22:24:58 2012
@@ -60,10 +60,10 @@ public class PageFile {
private static final String FREE_FILE_SUFFIX = ".free";
// 4k Default page size.
- public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "" + 1024 * 4));
- public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", "" + 1000));
- public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", "" + 100));
- ;
+ public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
+ public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
+ public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
+
private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
@@ -87,14 +87,14 @@ public class PageFile {
// The minimum number of space allocated to the recovery file in number of pages.
private int recoveryFileMinPageCount = 1000;
- // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
+ // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
// to this max size as soon as possible.
private int recoveryFileMaxPageCount = 10000;
// The number of pages in the current recovery buffer
private int recoveryPageCount;
private AtomicBoolean loaded = new AtomicBoolean();
- // The number of pages we are aiming to write every time we
+ // The number of pages we are aiming to write every time we
// write to disk.
int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
@@ -113,7 +113,7 @@ public class PageFile {
// Will writes be done in an async thread?
private boolean enabledWriteThread = false;
- // These are used if enableAsyncWrites==true
+ // These are used if enableAsyncWrites==true
private AtomicBoolean stopWriter = new AtomicBoolean();
private Thread writerThread;
private CountDownLatch checkpointLatch;
@@ -127,7 +127,7 @@ public class PageFile {
private AtomicLong nextTxid = new AtomicLong();
- // Persistent settings stored in the page file.
+ // Persistent settings stored in the page file.
private MetaData metaData;
private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
@@ -198,13 +198,11 @@ public class PageFile {
void begin() {
if (currentLocation != -1) {
diskBoundLocation = currentLocation;
- currentLocation = -1;
- current = null;
} else {
diskBound = current;
- current = null;
- currentLocation = -1;
}
+ current = null;
+ currentLocation = -1;
}
/**
@@ -219,7 +217,6 @@ public class PageFile {
boolean isDone() {
return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
}
-
}
/**
@@ -336,10 +333,8 @@ public class PageFile {
* @throws IOException
*/
private void delete(File file) throws IOException {
- if (file.exists()) {
- if (!file.delete()) {
- throw new IOException("Could not delete: " + file.getPath());
- }
+ if (file.exists() && !file.delete()) {
+ throw new IOException("Could not delete: " + file.getPath());
}
}
@@ -407,13 +402,12 @@ public class PageFile {
// Scan all to find the free pages.
freeList = new SequenceSet();
- for (Iterator i = tx().iterator(true); i.hasNext(); ) {
- Page page = (Page) i.next();
+ for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
+ Page page = i.next();
if (page.getType() == Page.PAGE_FREE_TYPE) {
freeList.add(page.getPageId());
}
}
-
}
metaData.setCleanShutdown(false);
@@ -427,7 +421,7 @@ public class PageFile {
startWriter();
} else {
- throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
+ throw new IllegalStateException("Cannot load the page file when it is already loaded.");
}
}
@@ -516,7 +510,9 @@ public class PageFile {
try {
checkpointLatch.await();
} catch (InterruptedException e) {
- throw new InterruptedIOException();
+ InterruptedIOException ioe = new InterruptedIOException();
+ ioe.initCause(e);
+ throw ioe;
}
}
@@ -597,7 +593,7 @@ public class PageFile {
ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
p.store(os, "");
if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
- throw new IOException("Configuation is to larger than: " + PAGE_FILE_HEADER_SIZE / 2);
+ throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
}
// Fill the rest with space...
byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
@@ -632,7 +628,7 @@ public class PageFile {
}
///////////////////////////////////////////////////////////////////
- // Property Accessors
+ // Property Accessors
///////////////////////////////////////////////////////////////////
/**
@@ -773,7 +769,6 @@ public class PageFile {
}
public void setWriteBatchSize(int writeBatchSize) {
- assertNotLoaded();
this.writeBatchSize = writeBatchSize;
}
@@ -833,9 +828,14 @@ public class PageFile {
Page<T> first = null;
int c = count;
- while (c > 0) {
- Page<T> page = new Page<T>(nextFreePageId.getAndIncrement());
- page.makeFree(getNextWriteTransactionId());
+
+ // Perform the id's only once....
+ long pageId = nextFreePageId.getAndAdd(count);
+ long writeTxnId = nextTxid.getAndAdd(count);
+
+ while (c-- > 0) {
+ Page<T> page = new Page<T>(pageId++);
+ page.makeFree(writeTxnId++);
if (first == null) {
first = page;
@@ -847,7 +847,6 @@ public class PageFile {
write(page, out.getData());
// LOG.debug("allocate writing: "+page.getPageId());
- c--;
}
return first;
@@ -985,9 +984,6 @@ public class PageFile {
// Internal Double write implementation follows...
///////////////////////////////////////////////////////////////////
- /**
- *
- */
private void pollWrites() {
try {
while (!stopWriter.get()) {
@@ -1007,7 +1003,7 @@ public class PageFile {
writeBatch();
}
} catch (Throwable e) {
- e.printStackTrace();
+ LOG.info("An exception was raised while performing poll writes", e);
} finally {
releaseCheckpointWaiter();
}
@@ -1165,7 +1161,7 @@ public class PageFile {
batch.put(offset, data);
}
} catch (Exception e) {
- // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
+ // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
// as the pages should still be consistent.
LOG.debug("Redo buffer was not fully intact: ", e);
return nextTxId;
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Mon Feb 6 22:24:58 2012
@@ -39,6 +39,8 @@ public class Transaction implements Iter
* and it's data is larger than what would fit into a single page.
*/
public class PageOverflowIOException extends IOException {
+ private static final long serialVersionUID = 1L;
+
public PageOverflowIOException(String message) {
super(message);
}
@@ -49,6 +51,8 @@ public class Transaction implements Iter
* with an invalid page id.
*/
public class InvalidPageIOException extends IOException {
+ private static final long serialVersionUID = 1L;
+
private final long page;
public InvalidPageIOException(String message, long page) {
@@ -92,7 +96,7 @@ public class Transaction implements Iter
// List of pages freed in this transaction
private final SequenceSet freeList = new SequenceSet();
- private long maxTransactionSize = Long.parseLong(System.getProperty("maxKahaDBTxSize", "" + 10485760));
+ private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
private long size = 0;
@@ -178,12 +182,14 @@ public class Transaction implements Iter
public <T> void free(Page<T> page, int count) throws IOException {
pageFile.assertLoaded();
long offsetPage = page.getPageId();
- for (int i = 0; i < count; i++) {
+ while (count-- > 0) {
if (page == null) {
- page = load(offsetPage + i, null);
+ page = load(offsetPage, null);
}
free(page);
page = null;
+ // Increment the offsetPage value since using it depends on the current count.
+ offsetPage++;
}
}
@@ -318,7 +324,6 @@ public class Transaction implements Iter
}
- @SuppressWarnings("unchecked")
@Override
public void close() throws IOException {
super.close();
@@ -551,7 +556,6 @@ public class Transaction implements Iter
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- @SuppressWarnings("unchecked")
public Iterator<Page> iterator() {
return (Iterator<Page>)iterator(false);
}
@@ -569,6 +573,7 @@ public class Transaction implements Iter
pageFile.assertLoaded();
return new Iterator<Page>() {
+
long nextId;
Page nextPage;
Page lastPage;
@@ -699,7 +704,6 @@ public class Transaction implements Iter
/**
* Queues up a page write that should get done when commit() gets called.
*/
- @SuppressWarnings("unchecked")
private void write(final Page page, byte[] data) throws IOException {
Long key = page.getPageId();
@@ -707,7 +711,7 @@ public class Transaction implements Iter
size = writes.size() * pageFile.getPageSize();
PageWrite write;
-
+
if (size > maxTransactionSize) {
if (tmpFile == null) {
tmpFile = new RandomAccessFile(getTempFile(), "rw");
@@ -796,5 +800,4 @@ public class Transaction implements Iter
}
}
}
-
}
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java Mon Feb 6 22:24:58 2012
@@ -18,19 +18,18 @@ package org.apache.kahadb.util;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Stack;
-/**
- *
- */
public final class IOHelper {
+
protected static final int MAX_DIR_NAME_LENGTH;
protected static final int MAX_FILE_NAME_LENGTH;
private static final int DEFAULT_BUFFER_SIZE = 4096;
+
private IOHelper() {
}
@@ -65,18 +64,18 @@ public final class IOHelper {
public static String toFileSystemDirectorySafeName(String name) {
return toFileSystemSafeName(name, true, MAX_DIR_NAME_LENGTH);
}
-
+
public static String toFileSystemSafeName(String name) {
return toFileSystemSafeName(name, false, MAX_FILE_NAME_LENGTH);
}
-
+
/**
* Converts any string into a string that is safe to use as a file name.
* The result will only include ascii characters and numbers, and the "-","_", and "." characters.
*
* @param name
- * @param dirSeparators
- * @param maxFileLength
+ * @param dirSeparators
+ * @param maxFileLength
* @return
*/
public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
@@ -104,8 +103,45 @@ public final class IOHelper {
}
return result;
}
-
- public static boolean deleteFile(File fileToDelete) {
+
+ public static boolean delete(File top) {
+ boolean result = true;
+ Stack<File> files = new Stack<File>();
+ // Add file to the stack to be processed...
+ files.push(top);
+ // Process all files until none remain...
+ while (!files.isEmpty()) {
+ File file = files.pop();
+ if (file.isDirectory()) {
+ File list[] = file.listFiles();
+ if (list == null || list.length == 0) {
+ // The current directory contains no entries...
+ // delete directory and continue...
+ result &= file.delete();
+ } else {
+ // Add back the directory since it is not empty....
+ // and when we process it again it will be empty and can be
+ // deleted safely...
+ files.push(file);
+ for (File dirFile : list) {
+ if (dirFile.isDirectory()) {
+ // Place the directory on the stack...
+ files.push(dirFile);
+ } else {
+ // This is a simple file, delete it...
+ result &= dirFile.delete();
+ }
+ }
+ }
+ } else {
+ // This is a simple file, delete it...
+ result &= file.delete();
+ }
+ }
+ return result;
+ }
+
+ private static boolean deleteFile(File fileToDelete) {
if (fileToDelete == null || !fileToDelete.exists()) {
return true;
}
@@ -113,8 +149,8 @@ public final class IOHelper {
result &= fileToDelete.delete();
return result;
}
-
- public static boolean deleteChildren(File parent) {
+
+ private static boolean deleteChildren(File parent) {
if (parent == null || !parent.exists()) {
return false;
}
@@ -138,23 +174,22 @@ public final class IOHelper {
}
}
}
-
+
return result;
}
-
-
+
public static void moveFile(File src, File targetDirectory) throws IOException {
if (!src.renameTo(new File(targetDirectory, src.getName()))) {
throw new IOException("Failed to move " + src + " to " + targetDirectory);
}
}
-
+
public static void copyFile(File src, File dest) throws IOException {
FileInputStream fileSrc = new FileInputStream(src);
FileOutputStream fileDest = new FileOutputStream(dest);
copyInputStream(fileSrc, fileDest);
}
-
+
public static void copyInputStream(InputStream in, OutputStream out) throws IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int len = in.read(buffer);
@@ -165,19 +200,18 @@ public final class IOHelper {
in.close();
out.close();
}
-
+
static {
- MAX_DIR_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumDirNameLength","200")).intValue();
- MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","64")).intValue();
+ MAX_DIR_NAME_LENGTH = Integer.getInteger("MaximumDirNameLength",200);
+ MAX_FILE_NAME_LENGTH = Integer.getInteger("MaximumFileNameLength",64);
}
-
public static void mkdirs(File dir) throws IOException {
if (dir.exists()) {
if (!dir.isDirectory()) {
throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
}
-
+
} else {
if (!dir.mkdirs()) {
throw new IOException("Failed to create directory '" + dir+"'");
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java Mon Feb 6 22:24:58 2012
@@ -25,19 +25,19 @@ import java.util.Date;
/**
* Used to lock a File.
- *
+ *
* @author chirino
*/
public class LockFile {
-
- private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
+
+ private static final boolean DISABLE_FILE_LOCK = Boolean.getBoolean("java.nio.channels.FileLock.broken");
final private File file;
-
+
private FileLock lock;
private RandomAccessFile readFile;
private int lockCounter;
private final boolean deleteOnUnlock;
-
+
public LockFile(File file, boolean deleteOnUnlock) {
this.file = file;
this.deleteOnUnlock = deleteOnUnlock;
@@ -54,7 +54,7 @@ public class LockFile {
if( lockCounter>0 ) {
return;
}
-
+
IOHelper.mkdirs(file.getParentFile());
if (System.getProperty(getVmLockKey()) != null) {
throw new IOException("File '" + file + "' could not be locked as lock is already held for this jvm.");
@@ -80,7 +80,7 @@ public class LockFile {
}
throw new IOException("File '" + file + "' could not be locked.");
}
-
+
}
}
@@ -90,12 +90,12 @@ public class LockFile {
if (DISABLE_FILE_LOCK) {
return;
}
-
+
lockCounter--;
if( lockCounter!=0 ) {
return;
}
-
+
// release the lock..
if (lock != null) {
try {
@@ -106,7 +106,7 @@ public class LockFile {
lock = null;
}
closeReadFile();
-
+
if( deleteOnUnlock ) {
file.delete();
}
@@ -125,7 +125,7 @@ public class LockFile {
}
readFile = null;
}
-
+
}
}
Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java Mon Feb 6 22:24:58 2012
@@ -49,16 +49,15 @@ public abstract class IndexBenchmark ext
public void setUp() throws Exception {
ROOT_DIR = new File(IOHelper.getDefaultDataDirectory());
- IOHelper.mkdirs(ROOT_DIR);
- IOHelper.deleteChildren(ROOT_DIR);
-
+ IOHelper.delete(ROOT_DIR);
+
pf = new PageFile(ROOT_DIR, getClass().getName());
pf.load();
}
protected void tearDown() throws Exception {
Transaction tx = pf.tx();
- for (Index i : indexes.values()) {
+ for (Index<?, ?> i : indexes.values()) {
try {
i.unload(tx);
} catch (Throwable ignore) {
@@ -99,7 +98,7 @@ public abstract class IndexBenchmark ext
try {
Transaction tx = pf.tx();
-
+
Index<String,Long> index = openIndex(name);
long counter = 0;
while (!shutdown.get()) {
@@ -109,7 +108,7 @@ public abstract class IndexBenchmark ext
index.put(tx, key, c);
tx.commit();
Thread.yield(); // This avoids consumer starvation..
-
+
onProduced(counter++);
}
@@ -121,7 +120,7 @@ public abstract class IndexBenchmark ext
public void onProduced(long counter) {
}
}
-
+
protected String key(long c) {
return "a-long-message-id-like-key-" + c;
}
@@ -150,7 +149,7 @@ public abstract class IndexBenchmark ext
while (!shutdown.get()) {
long c = counter;
String key = key(c);
-
+
Long record = index.get(tx, key);
if (record != null) {
if( index.remove(tx, key) == null ) {
Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java Mon Feb 6 22:24:58 2012
@@ -43,9 +43,7 @@ public abstract class IndexTestSupport e
protected void setUp() throws Exception {
super.setUp();
directory = new File(IOHelper.getDefaultDataDirectory());
- IOHelper.mkdirs(directory);
- IOHelper.deleteChildren(directory);
-
+ IOHelper.delete(directory);
}
protected void tearDown() throws Exception {
@@ -54,7 +52,7 @@ public abstract class IndexTestSupport e
pf.delete();
}
}
-
+
protected void createPageFileAndIndex(int pageSize) throws Exception {
pf = new PageFile(directory, getClass().getName());
pf.setPageSize(pageSize);
Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java Mon Feb 6 22:24:58 2012
@@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IOHelper;
public class JournalTest extends TestCase {
protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
Journal dataManager;
File dir;
-
+
@Override
public void setUp() throws Exception {
dir = new File("target/tests/DataFileAppenderTest");
@@ -39,28 +40,16 @@ public class JournalTest extends TestCas
configure(dataManager);
dataManager.start();
}
-
+
protected void configure(Journal dataManager) {
}
@Override
public void tearDown() throws Exception {
dataManager.close();
- deleteFilesInDirectory(dir);
- dir.delete();
+ IOHelper.delete(dir);
}
- private void deleteFilesInDirectory(File directory) {
- File[] files = directory.listFiles();
- for (int i=0; i<files.length; i++) {
- File f = files[i];
- if (f.isDirectory()) {
- deleteFilesInDirectory(f);
- }
- f.delete();
- }
- }
-
public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception {
final int iterations = 10;
final CountDownLatch latch = new CountDownLatch(iterations);
@@ -68,7 +57,7 @@ public class JournalTest extends TestCas
for (int i=0; i < iterations; i++) {
dataManager.write(data, new Runnable() {
public void run() {
- latch.countDown();
+ latch.countDown();
}
});
}
@@ -84,7 +73,7 @@ public class JournalTest extends TestCas
for (int i=0; i<iterations; i++) {
dataManager.write(data, new Runnable() {
public void run() {
- latch.countDown();
+ latch.countDown();
}
});
}
@@ -92,7 +81,7 @@ public class JournalTest extends TestCas
assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
assertEquals("none written", 0, latch.getCount());
}
-
+
public void testBatchWriteCompleteAfterClose() throws Exception {
ByteSequence data = new ByteSequence("DATA".getBytes());
final int iterations = 10;
@@ -102,27 +91,27 @@ public class JournalTest extends TestCas
dataManager.close();
assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty());
}
-
+
public void testBatchWriteToMaxMessageSize() throws Exception {
final int iterations = 4;
final CountDownLatch latch = new CountDownLatch(iterations);
Runnable done = new Runnable() {
public void run() {
- latch.countDown();
+ latch.countDown();
}
};
int messageSize = DEFAULT_MAX_BATCH_SIZE / iterations;
byte[] message = new byte[messageSize];
ByteSequence data = new ByteSequence(message);
-
+
for (int i=0; i< iterations; i++) {
dataManager.write(data, done);
}
-
+
// write may take some time
assertTrue("all callbacks complete", latch.await(10, TimeUnit.SECONDS));
}
-
+
public void testNoBatchWriteWithSync() throws Exception {
ByteSequence data = new ByteSequence("DATA".getBytes());
final int iterations = 10;