You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@labs.apache.org by el...@apache.org on 2012/08/15 07:30:04 UTC
svn commit: r1373216 - in
/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree:
BTree.java BTreeConfiguration.java
Author: elecharny
Date: Wed Aug 15 05:30:03 2012
New Revision: 1373216
URL: http://svn.apache.org/viewvc?rev=1373216&view=rev
Log:
o Added a first drop of modifications to handle a journal of modifications, when not using the in-memory BTree
o Simplified the configuration of the file and journal names
Modified:
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java
Modified: labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
URL: http://svn.apache.org/viewvc/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java?rev=1373216&r1=1373215&r2=1373216&view=diff
==============================================================================
--- labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java (original)
+++ labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java Wed Aug 15 05:30:03 2012
@@ -20,6 +20,7 @@
package org.apache.mavibot.btree;
+import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -30,7 +31,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Comparator;
import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -52,8 +55,11 @@ public class BTree<K, V>
/** Default page size (number of entries per node) */
public static final int DEFAULT_PAGE_SIZE = 16;
+ /** The default journal name */
+ public static final String DEFAULT_JOURNAL = "mavibot.log";
+
/** A field used to generate new revisions in a thread safe way */
- private AtomicLong revision = new AtomicLong( 0 );
+ private AtomicLong revision;
/** A field used to generate new recordId in a thread safe way */
private transient AtomicLong pageRecordIdGenerator;
@@ -65,7 +71,6 @@ public class BTree<K, V>
protected volatile Page<K, V> rootPage;
/** The list of read transactions being executed */
- //private ConcurrentDoublyLinkedList<Transaction<K, V>> readTransactions;
private ConcurrentLinkedQueue<Transaction<K, V>> readTransactions;
/** Number of entries in each Page. */
@@ -82,8 +87,14 @@ public class BTree<K, V>
/** The associated file. If null, this is an in-memory btree */
private File file;
+ /** A flag set to true when the BTree is a in-memory BTree */
+ private boolean inMemory;
+
+ /** The associated journal. If null, this is an in-memory btree */
+ private File journal;
+
/** The number of elements in the current revision */
- private AtomicLong nbElems = new AtomicLong( 0 );
+ private AtomicLong nbElems;
/** A lock used to protect the write operation against concurrent access */
private final ReentrantLock writeLock = new ReentrantLock();
@@ -91,12 +102,18 @@ public class BTree<K, V>
/** The thread responsible for the cleanup of timed out reads */
private Thread readTransactionsThread;
+ /** The thread responsible for the journal updates */
+ private Thread journalManagerThread;
+
/** Define a default delay for a read transaction. This is 10 seconds */
public static final long DEFAULT_READ_TIMEOUT = 10 * 1000L;
/** The read transaction timeout */
private long readTimeOut = DEFAULT_READ_TIMEOUT;
+ /** The queue containing all the modifications applied on the bTree */
+ private BlockingQueue<Modification<K, V>> modificationsQueue;
+
/**
* Create a thread that is responsible of cleaning the transactions when
@@ -160,6 +177,76 @@ public class BTree<K, V>
/**
+ * Create a thread that is responsible of writing the modifications in a journal.
+ * The journal will contain all the modifications in the order they have been applied
+ * to the BTree. We will store Insertions and Deletions. Those operations are injected
+ * into a queue, which is read by the thread.
+ */
+ private void createJournalManager()
+ {
+ Runnable journalTask = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ FileOutputStream stream = new FileOutputStream( journal );
+ FileChannel channel = stream.getChannel();
+
+ while ( !Thread.currentThread().isInterrupted() )
+ {
+ Modification<K, V> modification = modificationsQueue.take();
+
+ if ( modification instanceof Addition )
+ {
+ byte[] keyBuffer = serializer.serializeKey( modification.getKey() );
+ ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
+ bb.put( Modification.ADDITION );
+ bb.put( keyBuffer );
+ bb.flip();
+
+ channel.write( bb );
+
+ byte[] valueBuffer = serializer.serializeValue( modification.getValue() );
+ bb = ByteBuffer.allocateDirect( valueBuffer.length );
+ bb.put( valueBuffer );
+ bb.flip();
+
+ channel.write( bb );
+ }
+ else
+ {
+ byte[] keyBuffer = serializer.serializeKey( modification.getKey() );
+ ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
+ bb.put( Modification.DELETION );
+ bb.put( keyBuffer );
+ bb.flip();
+
+ channel.write( bb );
+ }
+
+ // Flush to the disk for real
+ channel.force( true );
+ }
+ }
+ catch ( InterruptedException ie )
+ {
+ //System.out.println( "Interrupted" );
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException( e );
+ }
+ }
+ };
+
+ journalManagerThread = new Thread( journalTask );
+ journalManagerThread.setDaemon( true );
+ journalManagerThread.start();
+ }
+
+
+ /**
* Creates a new in-memory BTree using the BTreeConfiguration to initialize the
* BTree
*
@@ -167,9 +254,11 @@ public class BTree<K, V>
*/
public BTree( BTreeConfiguration<K, V> configuration ) throws IOException
{
- String fileName = configuration.getFilePrefix() + "." + configuration.getFileSuffix();
+ String fileName = configuration.getFileName();
+ String journalName = configuration.getJournalName();
File btreeFile = new File( configuration.getFilePath(), fileName );
+ File journalFile = new File( configuration.getJournalPath(), journalName );
pageSize = configuration.getPageSize();
comparator = configuration.getComparator();
@@ -181,21 +270,11 @@ public class BTree<K, V>
throw new IllegalArgumentException( "Comparator should not be null" );
}
+ file = btreeFile;
+ journal = journalFile;
+
// Now, initialize the BTree
init();
-
- // Last, we load the data from the file, if it exists.
- if ( btreeFile.exists() )
- {
- // The file already exists, load it.
- file = btreeFile;
- load( file );
- }
- else
- {
- // We will create the new file
- file = btreeFile;
- }
}
@@ -319,9 +398,19 @@ public class BTree<K, V>
public void init() throws IOException
{
// Create the queue containing the pending read transactions
- //readTransactions = new ConcurrentDoublyLinkedList<Transaction<K, V>>();
readTransactions = new ConcurrentLinkedQueue<Transaction<K, V>>();
+ // Create the queue containing the modifications, if it's not a in-memory btree
+ if ( file != null )
+ {
+ modificationsQueue = new LinkedBlockingDeque<Modification<K, V>>();
+ inMemory = false;
+ }
+ else
+ {
+ inMemory = true;
+ }
+
// Initialize the PageId counter
pageRecordIdGenerator = new AtomicLong( 0 );
@@ -342,18 +431,75 @@ public class BTree<K, V>
keyType = ( Class<?> ) argumentTypes[0];
}
+ nbElems = new AtomicLong( 0 );
+
+ // Check the files and create them if missing
+ if ( file != null )
+ {
+ if ( !file.exists() )
+ {
+ file.createNewFile();
+
+ if ( journal == null )
+ {
+ journal = new File( file.getParentFile(), BTree.DEFAULT_JOURNAL );
+ }
+
+ journal.createNewFile();
+ }
+ else
+ {
+ if ( file.length() > 0 )
+ {
+ // We have some existing file, load it
+ load( file );
+ }
+
+ if ( journal == null )
+ {
+ journal = new File( file.getParentFile(), BTree.DEFAULT_JOURNAL );
+ }
+
+ journal.createNewFile();
+
+ // If the journal is not empty, we have to read it
+ // and to apply all the modifications to the current file
+ if ( journal.length() > 0 )
+ {
+ applyJournal();
+ }
+ }
+ }
+
// Initialize the txnManager thread
createTransactionManager();
+
+ // Initialize the Journal manager thread if it's not a in-memory btree
+ if ( !inMemory )
+ {
+ createJournalManager();
+ }
}
/**
* Close the BTree, cleaning up all the data structure
*/
- public void close()
+ public void close() throws IOException
{
+ // Stop the readTransaction thread
readTransactionsThread.interrupt();
readTransactions.clear();
+
+ if ( !inMemory )
+ {
+ // Stop the journal manager thread
+ journalManagerThread.interrupt();
+
+ // Flush the data
+ flush();
+ }
+
rootPage = null;
}
@@ -538,6 +684,12 @@ public class BTree<K, V>
tuple = removeResult.getRemovedElement();
}
+ if ( !inMemory )
+ {
+ // Inject the modification into the modification queue
+ modificationsQueue.add( new Deletion<K, V>( key ) );
+ }
+
// Return the value we have found if it was modified
return tuple;
}
@@ -666,6 +818,12 @@ public class BTree<K, V>
rootPage = new Node<K, V>( this, revision, pivot, leftPage, rightPage );
}
+ // Inject the modification into the modification queue
+ if ( !inMemory )
+ {
+ modificationsQueue.add( new Addition<K, V>( key, value ) );
+ }
+
// Return the value we have found if it was modified
return modifiedValue;
}
@@ -774,7 +932,17 @@ public class BTree<K, V>
*/
public void flush( File file ) throws IOException
{
- File baseDirectory = new File( file.getParentFile().getAbsolutePath() );
+ File parentFile = file.getParentFile();
+ File baseDirectory = null;
+
+ if ( parentFile != null )
+ {
+ baseDirectory = new File( file.getParentFile().getAbsolutePath() );
+ }
+ else
+ {
+ baseDirectory = new File( "." );
+ }
// Create a temporary file in the same directory to flush the current btree
File tmpFileFD = File.createTempFile( "mavibot", null, baseDirectory );
@@ -796,11 +964,6 @@ public class BTree<K, V>
{
Tuple<K, V> tuple = cursor.next();
- if ( bb.remaining() == 0 )
- {
-
- }
-
byte[] keyBuffer = serializer.serializeKey( tuple.getKey() );
writeBuffer( ch, bb, keyBuffer );
@@ -833,6 +996,68 @@ public class BTree<K, V>
}
+ /**
+ * Inject all the modification from the journal into the btree
+ *
+ * @throws IOException If we had some issue while reading the journal
+ */
+ private void applyJournal() throws IOException
+ {
+ long revision = generateRevision();
+
+ if ( !journal.exists() )
+ {
+ throw new IOException( "The journal does not exist" );
+ }
+
+ FileChannel channel =
+ new RandomAccessFile( journal, "rw" ).getChannel();
+ ByteBuffer buffer = ByteBuffer.allocate( 65536 );
+
+ BufferHandler bufferHandler = new BufferHandler( channel, buffer );
+
+ // Loop on all the elements, store them in lists atm
+ try
+ {
+ while ( true )
+ {
+ // Read the type
+ byte[] type = bufferHandler.read( 1 );
+
+ if ( type[0] == Modification.ADDITION )
+ {
+ // Read the key
+ K key = serializer.deserializeKey( bufferHandler );
+
+ //keys.add( key );
+
+ // Read the value
+ V value = serializer.deserializeValue( bufferHandler );
+
+ //values.add( value );
+
+ // Inject the data in the tree. (to be replaced by a bulk load)
+ insert( key, value, revision );
+ }
+ else
+ {
+ // Read the key
+ K key = serializer.deserializeKey( bufferHandler );
+
+ // Remove the key from the tree
+ delete( key, revision );
+ }
+ }
+ }
+ catch ( EOFException eofe )
+ {
+ // Done reading the journal. Delete it and recreate a new one
+ journal.delete();
+ journal.createNewFile();
+ }
+ }
+
+
/**
* Read the data from the disk into this BTree. All the existing data in the
* BTree are kept, the read data will be associated with a new revision.
@@ -855,6 +1080,7 @@ public class BTree<K, V>
BufferHandler bufferHandler = new BufferHandler( channel, buffer );
long nbElems = LongSerializer.deserialize( bufferHandler.read( 8 ) );
+ this.nbElems.set( nbElems );
// Prepare a list of keys and values read from the disk
//List<K> keys = new ArrayList<K>();
@@ -888,7 +1114,14 @@ public class BTree<K, V>
*/
public void flush() throws IOException
{
- flush( file );
+ if ( !inMemory )
+ {
+ // Then flush the file
+ flush( file );
+
+ // And delete the journal
+ journal.delete();
+ }
}
Modified: labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java
URL: http://svn.apache.org/viewvc/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java?rev=1373216&r1=1373215&r2=1373216&view=diff
==============================================================================
--- labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java (original)
+++ labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java Wed Aug 15 05:30:03 2012
@@ -53,14 +53,9 @@ public class BTreeConfiguration<K, V>
private String filePath;
/**
- * The BTree file's name. Default to "mavibot".
+ * The BTree file's name.
*/
- private String filePrefix = "mavibot";
-
- /**
- * The BTree file's suffix. Default to "data".
- */
- private String fileSuffix = "data";
+ private String fileName;
/**
* The maximum delay to wait before a revision is considered as unused.
@@ -83,16 +78,12 @@ public class BTreeConfiguration<K, V>
private String journalPath;
/**
- * The journal's name. Default to "mavibot".
- */
- private String journalPrefix = "mavibot";
-
- /**
- * The journal's suffix. Default to "log".
+ * The journal's name. Default to "mavibot.log".
*/
- private String journalSuffix = "log";
+ private String journalName = BTree.DEFAULT_JOURNAL;
- /** The delay between two checkpoints. When we reach the maximum delay,
+ /**
+ * The delay between two checkpoints. When we reach the maximum delay,
* the BTree is flushed on disk, but only if we have had some modifications.
* The default value is 60 seconds.
*/
@@ -226,38 +217,20 @@ public class BTreeConfiguration<K, V>
/**
- * @return the filePrefix
- */
- public String getFilePrefix()
- {
- return filePrefix;
- }
-
-
- /**
- * @param filePrefix the filePrefix to set
+ * @return the file name
*/
- public void setFilePrefix( String filePrefix )
+ public String getFileName()
{
- this.filePrefix = filePrefix;
+ return fileName;
}
/**
- * @return the fileSuffix
+ * @param fileName the file name to set
*/
- public String getFileSuffix()
+ public void setFileName( String fileName )
{
- return fileSuffix;
- }
-
-
- /**
- * @param fileSuffix the fileSuffix to set
- */
- public void setFileSuffix( String fileSuffix )
- {
- this.fileSuffix = fileSuffix;
+ this.fileName = fileName;
}
@@ -280,37 +253,19 @@ public class BTreeConfiguration<K, V>
/**
- * @return the journalPrefix
- */
- public String getJournalPrefix()
- {
- return journalPrefix;
- }
-
-
- /**
- * @param journalPrefix the journalPrefix to set
- */
- public void setJournalPrefix( String journalPrefix )
- {
- this.journalPrefix = journalPrefix;
- }
-
-
- /**
- * @return the journalSuffix
+ * @return the journal name
*/
- public String getJournalSuffix()
+ public String getJournalName()
{
- return journalSuffix;
+ return journalName;
}
/**
- * @param journalSuffix the journalSuffix to set
+ * @param journalName the journal name to set
*/
- public void setJournalSuffix( String journalSuffix )
+ public void setJournalName( String journalName )
{
- this.journalSuffix = journalSuffix;
+ this.journalName = journalName;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org