You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/05 02:02:57 UTC
svn commit: r692306 - in
/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal:
DataFile.java DataFileAppender.java Journal.java NIODataFileAppender.java
ReadOnlyJournal.java
Author: chirino
Date: Thu Sep 4 17:02:57 2008
New Revision: 692306
URL: http://svn.apache.org/viewvc?rev=692306&view=rev
Log:
Updated the journal to use the new LinkedNodeList interfaces.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java Thu Sep 4 17:02:57 2008
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.RandomAccessFile;
+import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.LinkedNode;
@@ -28,7 +29,7 @@
*
* @version $Revision: 1.1.1.1 $
*/
-public class DataFile extends LinkedNode implements Comparable<DataFile> {
+public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFile> {
protected final File file;
protected final Integer dataFileId;
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Thu Sep 4 17:02:57 2008
@@ -25,6 +25,7 @@
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
/**
* An optimized writer to do batch appends to a data file. This object is thread
@@ -40,7 +41,8 @@
protected final Journal dataManager;
protected final Map<WriteKey, WriteCommand> inflightWrites;
- protected final Object enqueueMutex = new Object(){};
+ protected final Object enqueueMutex = new Object() {
+ };
protected WriteBatch nextWriteBatch;
protected boolean shutdown;
@@ -79,13 +81,14 @@
public class WriteBatch {
public final DataFile dataFile;
- public final WriteCommand first;
+
+ public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
public final CountDownLatch latch = new CountDownLatch(1);
public int size;
public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
this.dataFile = dataFile;
- this.first = write;
+ this.writes.addLast(write);
size += write.location.getSize();
}
@@ -100,12 +103,12 @@
}
public void append(WriteCommand write) throws IOException {
- this.first.getTailNode().linkAfter(write);
+ this.writes.addLast(write);
size += write.location.getSize();
}
}
- public static class WriteCommand extends LinkedNode {
+ public static class WriteCommand extends LinkedNode<WriteCommand> {
public final Location location;
public final ByteSequence data;
final boolean sync;
@@ -115,18 +118,17 @@
this.location = location;
this.data = data;
this.sync = sync;
- this.onComplete=null;
+ this.onComplete = null;
}
public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
this.location = location;
this.data = data;
- this.onComplete = onComplete;
+ this.onComplete = onComplete;
this.sync = false;
- }
+ }
}
-
/**
* Construct a Store writer
*
@@ -161,13 +163,14 @@
WriteCommand write = new WriteCommand(location, data, sync);
// Locate datafile and enqueue into the executor in sychronized block so
- // that writes get equeued onto the executor in order that they were assigned
+ // that writes get equeued onto the executor in order that they were
+ // assigned
// by the data manager (which is basically just appending)
synchronized (this) {
// Find the position where this item will land at.
DataFile dataFile = dataManager.allocateLocation(location);
- if( !sync ) {
+ if (!sync) {
inflightWrites.put(new WriteKey(location), write);
}
batch = enqueue(dataFile, write);
@@ -183,8 +186,8 @@
return location;
}
-
- public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
+
+ public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
@@ -196,7 +199,8 @@
WriteCommand write = new WriteCommand(location, data, onComplete);
// Locate datafile and enqueue into the executor in sychronized block so
- // that writes get equeued onto the executor in order that they were assigned
+ // that writes get equeued onto the executor in order that they were
+ // assigned
// by the data manager (which is basically just appending)
synchronized (this) {
@@ -208,7 +212,7 @@
location.setLatch(batch.latch);
return location;
- }
+ }
private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
@@ -287,13 +291,11 @@
/**
* The async processing loop that writes to the data files and does the
- * force calls.
- *
- * Since the file sync() call is the slowest of all the operations, this
- * algorithm tries to 'batch' or group together several file sync() requests
- * into a single file sync() call. The batching is accomplished attaching
- * the same CountDownLatch instance to every force request in a group.
- *
+ * force calls. Since the file sync() call is the slowest of all the
+ * operations, this algorithm tries to 'batch' or group together several
+ * file sync() requests into a single file sync() call. The batching is
+ * accomplished attaching the same CountDownLatch instance to every force
+ * request in a group.
*/
protected void processQueue() {
DataFile dataFile = null;
@@ -330,21 +332,20 @@
file = dataFile.openRandomAccessFile(true);
}
- WriteCommand write = wb.first;
+ WriteCommand write = wb.writes.getHead();
// Write all the data.
// Only need to seek to first location.. all others
// are in sequence.
file.seek(write.location.getOffset());
-
- boolean forceToDisk=false;
-
+ boolean forceToDisk = false;
+
//
// is it just 1 big write?
if (wb.size == write.location.getSize()) {
- forceToDisk = write.sync | write.onComplete!=null;
-
+ forceToDisk = write.sync | write.onComplete != null;
+
// Just write it directly..
file.writeInt(write.location.getSize());
file.writeByte(write.location.getType());
@@ -357,7 +358,7 @@
// Combine the smaller writes into 1 big buffer
while (write != null) {
- forceToDisk |= write.sync | write.onComplete!=null;
+ forceToDisk |= write.sync | write.onComplete != null;
buff.writeInt(write.location.getSize());
buff.writeByte(write.location.getType());
@@ -366,7 +367,7 @@
buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
buff.write(Journal.ITEM_HEAD_EOR);
- write = (WriteCommand)write.getNext();
+ write = write.getNext();
}
// Now do the 1 big write.
@@ -375,31 +376,31 @@
buff.reset();
}
- if( forceToDisk ) {
+ if (forceToDisk) {
file.getFD().sync();
}
-
- WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+
+ WriteCommand lastWrite = wb.writes.getTail();
dataManager.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in
// flight
// cache.
- write = wb.first;
+ write = wb.writes.getHead();
while (write != null) {
if (!write.sync) {
inflightWrites.remove(new WriteKey(write.location));
}
- if( write.onComplete !=null ) {
- try {
- write.onComplete.run();
- } catch (Throwable e) {
- e.printStackTrace();
- }
+ if (write.onComplete != null) {
+ try {
+ write.onComplete.run();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
}
- write = (WriteCommand)write.getNext();
+ write = write.getNext();
}
-
+
// Signal any waiting threads that the write is on disk.
wb.latch.countDown();
}
@@ -419,5 +420,4 @@
}
}
-
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Thu Sep 4 17:02:57 2008
@@ -42,10 +42,9 @@
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.LinkedNodeList;
import org.apache.kahadb.util.Scheduler;
-
-
/**
* Manages DataFiles
*
@@ -62,8 +61,12 @@
public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
- public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; //
- public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; //
+ public static final byte[] ITEM_HEAD_SOR = new byte[] {
+ 'S', 'O', 'R'
+ }; //
+ public static final byte[] ITEM_HEAD_EOR = new byte[] {
+ 'E', 'O', 'R'
+ }; //
public static final byte DATA_ITEM_TYPE = 1;
public static final byte REDO_ITEM_TYPE = 2;
@@ -79,7 +82,7 @@
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
protected File directory = new File(DEFAULT_DIRECTORY);
- protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
+ protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
protected String filePrefix = DEFAULT_FILE_PREFIX;
protected ControlFile controlFile;
protected boolean started;
@@ -93,18 +96,18 @@
protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
- protected DataFile currentWriteFile;
+ protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
protected Location mark;
protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
protected Runnable cleanupTask;
protected final AtomicLong storeSize;
protected boolean archiveDataLogs;
-
+
public Journal(AtomicLong storeSize) {
- this.storeSize=storeSize;
+ this.storeSize = storeSize;
}
-
+
public Journal() {
this(new AtomicLong());
}
@@ -116,7 +119,7 @@
}
started = true;
- preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
+ preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
lock();
ByteSequence sequence = controlFile.load();
@@ -134,7 +137,7 @@
return dir.equals(directory) && n.startsWith(filePrefix);
}
});
-
+
if (files != null) {
for (int i = 0; i < files.length; i++) {
try {
@@ -154,33 +157,29 @@
// right order.
List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
Collections.sort(l);
- currentWriteFile = null;
for (DataFile df : l) {
- if (currentWriteFile != null) {
- currentWriteFile.linkAfter(df);
- }
- currentWriteFile = df;
+ dataFiles.addLast(df);
fileByFileMap.put(df.getFile(), df);
}
}
// Need to check the current Write File to see if there was a partial
// write to it.
- if (currentWriteFile != null) {
+ if (!dataFiles.isEmpty()) {
// See if the lastSyncedLocation is valid..
Location l = lastAppendLocation.get();
- if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+ if (l != null && l.getDataFileId() != dataFiles.getTail().getDataFileId()) {
l = null;
}
// If we know the last location that was ok.. then we can skip lots
// of checking
- try{
- l = recoveryCheck(currentWriteFile, l);
- lastAppendLocation.set(l);
- }catch(IOException e){
- LOG.warn("recovery check failed", e);
+ try {
+ l = recoveryCheck(dataFiles.getTail(), l);
+ lastAppendLocation.set(l);
+ } catch (IOException e) {
+ LOG.warn("recovery check failed", e);
}
}
@@ -264,25 +263,24 @@
}
synchronized DataFile allocateLocation(Location location) throws IOException {
- if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
- int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
+ if (dataFiles.isEmpty()|| ((dataFiles.getTail().getLength() + location.getSize()) > maxFileLength)) {
+ int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
String fileName = filePrefix + nextNum;
File file = new File(directory, fileName);
DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
- //actually allocate the disk space
+ // actually allocate the disk space
nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
- if (currentWriteFile != null) {
- currentWriteFile.linkAfter(nextWriteFile);
- if (currentWriteFile.isUnused()) {
- removeDataFile(currentWriteFile);
- }
+ dataFiles.addLast(nextWriteFile);
+
+ DataFile previous = dataFiles.getTail().getPrevious();
+ if (previous!=null && previous.isUnused()) {
+ removeDataFile(previous);
}
- currentWriteFile = nextWriteFile;
-
}
+ DataFile currentWriteFile = dataFiles.getTail();
location.setOffset(currentWriteFile.getLength());
location.setDataFileId(currentWriteFile.getDataFileId().intValue());
int size = location.getSize();
@@ -291,9 +289,9 @@
storeSize.addAndGet(size);
return currentWriteFile;
}
-
- public synchronized void removeLocation(Location location) throws IOException{
-
+
+ public synchronized void removeLocation(Location location) throws IOException {
+
DataFile dataFile = getDataFile(location);
dataFile.decrement();
}
@@ -307,19 +305,19 @@
}
return dataFile;
}
-
+
synchronized File getFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
DataFile dataFile = fileMap.get(key);
if (dataFile == null) {
LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
- throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
+ throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
}
return dataFile.getFile();
}
private DataFile getNextDataFile(DataFile dataFile) {
- return (DataFile)dataFile.getNext();
+ return dataFile.getNext();
}
public synchronized void close() throws IOException {
@@ -350,8 +348,8 @@
accessorPool.close();
boolean result = true;
- for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
- DataFile dataFile = (DataFile)i.next();
+ for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+ DataFile dataFile = i.next();
storeSize.addAndGet(-dataFile.getLength());
result &= dataFile.delete();
}
@@ -359,7 +357,7 @@
fileByFileMap.clear();
lastAppendLocation.set(null);
mark = null;
- currentWriteFile = null;
+ dataFiles = new LinkedNodeList<DataFile>();
// reopen open file handles...
accessorPool = new DataFileAccessorPool(this);
@@ -374,7 +372,7 @@
public synchronized void addInterestInFile(int file) throws IOException {
if (file >= 0) {
Integer key = Integer.valueOf(file);
- DataFile dataFile = (DataFile)fileMap.get(key);
+ DataFile dataFile = fileMap.get(key);
if (dataFile == null) {
throw new IOException("That data file does not exist");
}
@@ -391,10 +389,10 @@
public synchronized void removeInterestInFile(int file) throws IOException {
if (file >= 0) {
Integer key = Integer.valueOf(file);
- DataFile dataFile = (DataFile)fileMap.get(key);
+ DataFile dataFile = fileMap.get(key);
removeInterestInFile(dataFile);
}
-
+
}
synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
@@ -405,18 +403,18 @@
}
}
- public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
+ public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer> inProgress) throws IOException {
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
unUsed.removeAll(inUse);
unUsed.removeAll(inProgress);
-
+
List<DataFile> purgeList = new ArrayList<DataFile>();
for (Integer key : unUsed) {
- DataFile dataFile = (DataFile)fileMap.get(key);
+ DataFile dataFile = fileMap.get(key);
purgeList.add(dataFile);
}
for (DataFile dataFile : purgeList) {
- if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
+ if (dataFile != dataFiles.getTail()) {
forceRemoveDataFile(dataFile);
}
}
@@ -425,19 +423,19 @@
public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
unUsed.removeAll(inUse);
-
+
List<DataFile> purgeList = new ArrayList<DataFile>();
for (Integer key : unUsed) {
- // Only add files less than the lastFile..
- if( key.intValue() < lastFile.intValue() ) {
- DataFile dataFile = (DataFile)fileMap.get(key);
+ // Only add files less than the lastFile..
+ if (key.intValue() < lastFile.intValue()) {
+ DataFile dataFile = fileMap.get(key);
purgeList.add(dataFile);
- }
+ }
}
for (DataFile dataFile : purgeList) {
forceRemoveDataFile(dataFile);
}
- }
+ }
public synchronized void consolidateDataFiles() throws IOException {
List<DataFile> purgeList = new ArrayList<DataFile>();
@@ -454,28 +452,25 @@
private synchronized void removeDataFile(DataFile dataFile) throws IOException {
// Make sure we don't delete too much data.
- if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
+ if (dataFile == dataFiles.getTail() || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
LOG.debug("Won't remove DataFile" + dataFile);
- return;
+ return;
}
forceRemoveDataFile(dataFile);
}
-
- private synchronized void forceRemoveDataFile(DataFile dataFile)
- throws IOException {
+
+ private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
accessorPool.disposeDataFileAccessors(dataFile);
fileByFileMap.remove(dataFile.getFile());
- DataFile removed = fileMap.remove(dataFile.getDataFileId());
+ fileMap.remove(dataFile.getDataFileId());
storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink();
if (archiveDataLogs) {
dataFile.move(getDirectoryArchive());
- LOG.info("moved data file " + dataFile + " to "
- + getDirectoryArchive());
+ LOG.info("moved data file " + dataFile + " to " + getDirectoryArchive());
} else {
boolean result = dataFile.delete();
- LOG.info("discarding data file " + dataFile
- + (result ? "successful " : "failed"));
+ LOG.info("discarding data file " + dataFile + (result ? "successful " : "failed"));
}
}
@@ -507,18 +502,18 @@
while (true) {
if (cur == null) {
if (location == null) {
- DataFile head = (DataFile)currentWriteFile.getHeadNode();
+ DataFile head = dataFiles.getHead();
cur = new Location();
cur.setDataFileId(head.getDataFileId());
cur.setOffset(0);
} else {
// Set to the next offset..
- if( location.getSize() == -1 ) {
- cur = new Location(location);
- } else {
- cur = new Location(location);
- cur.setOffset(location.getOffset()+location.getSize());
- }
+ if (location.getSize() == -1) {
+ cur = new Location(location);
+ } else {
+ cur = new Location(location);
+ cur.setOffset(location.getOffset() + location.getSize());
+ }
}
} else {
cur.setOffset(cur.getOffset() + cur.getSize());
@@ -553,20 +548,19 @@
}
}
}
-
- public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
+
+ public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
DataFile df = fileByFileMap.get(file);
- return getNextLocation(df, lastLocation,thisFileOnly);
+ return getNextLocation(df, lastLocation, thisFileOnly);
}
-
- public synchronized Location getNextLocation(DataFile dataFile,
- Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
+
+ public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
Location cur = null;
while (true) {
if (cur == null) {
if (lastLocation == null) {
- DataFile head = (DataFile)dataFile.getHeadNode();
+ DataFile head = dataFile.getHeadNode();
cur = new Location();
cur.setDataFileId(head.getDataFileId());
cur.setOffset(0);
@@ -579,19 +573,18 @@
cur.setOffset(cur.getOffset() + cur.getSize());
}
-
// Did it go into the next file??
if (dataFile.getLength() <= cur.getOffset()) {
if (thisFileOnly) {
return null;
- }else {
- dataFile = getNextDataFile(dataFile);
- if (dataFile == null) {
- return null;
} else {
- cur.setDataFileId(dataFile.getDataFileId().intValue());
- cur.setOffset(0);
- }
+ dataFile = getNextDataFile(dataFile);
+ if (dataFile == null) {
+ return null;
+ } else {
+ cur.setDataFileId(dataFile.getDataFileId().intValue());
+ cur.setOffset(0);
+ }
}
}
@@ -641,7 +634,7 @@
Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
return loc;
}
-
+
public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
return loc;
@@ -689,22 +682,22 @@
this.lastAppendLocation.set(lastSyncedLocation);
}
- public boolean isUseNio() {
- return useNio;
- }
-
- public void setUseNio(boolean useNio) {
- this.useNio = useNio;
- }
-
- public File getDirectoryArchive() {
+ public boolean isUseNio() {
+ return useNio;
+ }
+
+ public void setUseNio(boolean useNio) {
+ this.useNio = useNio;
+ }
+
+ public File getDirectoryArchive() {
return directoryArchive;
}
public void setDirectoryArchive(File directoryArchive) {
this.directoryArchive = directoryArchive;
}
-
+
public boolean isArchiveDataLogs() {
return archiveDataLogs;
}
@@ -714,40 +707,41 @@
}
synchronized public Integer getCurrentDataFileId() {
- if( currentWriteFile==null )
+ if (dataFiles.isEmpty())
return null;
- return currentWriteFile.getDataFileId();
+ return dataFiles.getTail().getDataFileId();
}
-
+
/**
* Get a set of files - only valid after start()
+ *
* @return files currently being used
*/
- public Set<File> getFiles(){
+ public Set<File> getFiles() {
return fileByFileMap.keySet();
}
- synchronized public long getDiskSize() {
- long rc=0;
- DataFile cur = (DataFile)currentWriteFile.getHeadNode();
- while( cur !=null ) {
- rc += cur.getLength();
- cur = (DataFile) cur.getNext();
- }
- return rc;
- }
-
- synchronized public long getDiskSizeUntil(Location startPosition) {
- long rc=0;
- DataFile cur = (DataFile)currentWriteFile.getHeadNode();
- while( cur !=null ) {
- if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
- return rc + startPosition.getOffset();
- }
- rc += cur.getLength();
- cur = (DataFile) cur.getNext();
+ synchronized public long getDiskSize() {
+ long rc = 0;
+ DataFile cur = dataFiles.getHead();
+ while (cur != null) {
+ rc += cur.getLength();
+ cur = cur.getNext();
}
- return rc;
- }
+ return rc;
+ }
+
+ synchronized public long getDiskSizeUntil(Location startPosition) {
+ long rc = 0;
+ DataFile cur = dataFiles.getHead();
+ while (cur != null) {
+ if (cur.getDataFileId().intValue() >= startPosition.getDataFileId()) {
+ return rc + startPosition.getOffset();
+ }
+ rc += cur.getLength();
+ cur = cur.getNext();
+ }
+ return rc;
+ }
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java Thu Sep 4 17:02:57 2008
@@ -91,7 +91,7 @@
channel = file.getChannel();
}
- WriteCommand write = wb.first;
+ WriteCommand write = wb.writes.getHead();
// Write all the data.
// Only need to seek to first location.. all others
@@ -139,7 +139,7 @@
copy(footer, buffer);
assert !footer.hasRemaining();
- write = (WriteCommand)write.getNext();
+ write = write.getNext();
}
// Fully write out the buffer..
@@ -152,13 +152,13 @@
file.getChannel().force(false);
}
- WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+ WriteCommand lastWrite = wb.writes.getTail();
dataManager.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in
// flight
// cache.
- write = wb.first;
+ write = wb.writes.getHead();
while (write != null) {
if (!write.sync) {
inflightWrites.remove(new WriteKey(write.location));
@@ -170,7 +170,7 @@
e.printStackTrace();
}
}
- write = (WriteCommand)write.getNext();
+ write = write.getNext();
}
// Signal any waiting threads that the write is on disk.
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java Thu Sep 4 17:02:57 2008
@@ -25,8 +25,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.Scheduler;
/**
* An AsyncDataManager that works in read only mode against multiple data directories.
@@ -77,27 +75,32 @@
// Sort the list so that we can link the DataFiles together in the
// right order.
- List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
- Collections.sort(dataFiles);
- currentWriteFile = null;
- for (DataFile df : dataFiles) {
- if (currentWriteFile != null) {
- currentWriteFile.linkAfter(df);
- }
- currentWriteFile = df;
+ List<DataFile> list = new ArrayList<DataFile>(fileMap.values());
+ Collections.sort(list);
+ for (DataFile df : list) {
+ dataFiles.addLast(df);
fileByFileMap.put(df.getFile(), df);
}
- // Need to check the current Write File to see if there was a partial
- // write to it.
- if (currentWriteFile != null) {
-
- // See if the lastSyncedLocation is valid..
- Location l = lastAppendLocation.get();
- if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
- l = null;
- }
- }
+// // Need to check the current Write File to see if there was a partial
+// // write to it.
+// if (!dataFiles.isEmpty()) {
+//
+// // See if the lastSyncedLocation is valid..
+// Location l = lastAppendLocation.get();
+// if (l != null && l.getDataFileId() != dataFiles.getTail().getDataFileId().intValue()) {
+// l = null;
+// }
+//
+// // If we know the last location that was ok.. then we can skip lots
+// // of checking
+// try {
+// l = recoveryCheck(dataFiles.getTail(), l);
+// lastAppendLocation.set(l);
+// } catch (IOException e) {
+// LOG.warn("recovery check failed", e);
+// }
+// }
}
public synchronized void close() throws IOException {
@@ -112,11 +115,11 @@
public Location getFirstLocation() throws IllegalStateException, IOException {
- if( currentWriteFile == null ) {
+ if( dataFiles.isEmpty() ) {
return null;
}
- DataFile first = (DataFile)currentWriteFile.getHeadNode();
+ DataFile first = dataFiles.getHead();
Location cur = new Location();
cur.setDataFileId(first.getDataFileId());
cur.setOffset(0);