You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@labs.apache.org by el...@apache.org on 2012/08/15 07:30:04 UTC

svn commit: r1373216 - in /labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree: BTree.java BTreeConfiguration.java

Author: elecharny
Date: Wed Aug 15 05:30:03 2012
New Revision: 1373216

URL: http://svn.apache.org/viewvc?rev=1373216&view=rev
Log:
o Added a first drop of modifications to handle a journal of modifications, when not using the in-memory BTree
o Simplified the configuration of the file and journal names

Modified:
    labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
    labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java

Modified: labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
URL: http://svn.apache.org/viewvc/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java?rev=1373216&r1=1373215&r2=1373216&view=diff
==============================================================================
--- labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java (original)
+++ labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java Wed Aug 15 05:30:03 2012
@@ -20,6 +20,7 @@
 package org.apache.mavibot.btree;
 
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -30,7 +31,9 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Comparator;
 import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -52,8 +55,11 @@ public class BTree<K, V>
     /** Default page size (number of entries per node) */
     public static final int DEFAULT_PAGE_SIZE = 16;
 
+    /** The default journal name */
+    public static final String DEFAULT_JOURNAL = "mavibot.log";
+
     /** A field used to generate new revisions in a thread safe way */
-    private AtomicLong revision = new AtomicLong( 0 );
+    private AtomicLong revision;
 
     /** A field used to generate new recordId in a thread safe way */
     private transient AtomicLong pageRecordIdGenerator;
@@ -65,7 +71,6 @@ public class BTree<K, V>
     protected volatile Page<K, V> rootPage;
 
     /** The list of read transactions being executed */
-    //private ConcurrentDoublyLinkedList<Transaction<K, V>> readTransactions;
     private ConcurrentLinkedQueue<Transaction<K, V>> readTransactions;
 
     /** Number of entries in each Page. */
@@ -82,8 +87,14 @@ public class BTree<K, V>
     /** The associated file. If null, this is an in-memory btree  */
     private File file;
 
+    /** A flag set to true when the BTree is a in-memory BTree */
+    private boolean inMemory;
+
+    /** The associated journal. If null, this is an in-memory btree  */
+    private File journal;
+
     /** The number of elements in the current revision */
-    private AtomicLong nbElems = new AtomicLong( 0 );
+    private AtomicLong nbElems;
 
     /** A lock used to protect the write operation against concurrent access */
     private final ReentrantLock writeLock = new ReentrantLock();
@@ -91,12 +102,18 @@ public class BTree<K, V>
     /** The thread responsible for the cleanup of timed out reads */
     private Thread readTransactionsThread;
 
+    /** The thread responsible for the journal updates */
+    private Thread journalManagerThread;
+
     /** Define a default delay for a read transaction. This is 10 seconds */
     public static final long DEFAULT_READ_TIMEOUT = 10 * 1000L;
 
     /** The read transaction timeout */
     private long readTimeOut = DEFAULT_READ_TIMEOUT;
 
+    /** The queue containing all the modifications applied on the bTree */
+    private BlockingQueue<Modification<K, V>> modificationsQueue;
+
 
     /**
      * Create a thread that is responsible of cleaning the transactions when
@@ -160,6 +177,76 @@ public class BTree<K, V>
 
 
     /**
+     * Create a thread that is responsible of writing the modifications in a journal.
+     * The journal will contain all the modifications in the order they have been applied
+     * to the BTree. We will store Insertions and Deletions. Those operations are injected
+     * into a queue, which is read by the thread.
+     */
+    private void createJournalManager()
+    {
+        Runnable journalTask = new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    FileOutputStream stream = new FileOutputStream( journal );
+                    FileChannel channel = stream.getChannel();
+
+                    while ( !Thread.currentThread().isInterrupted() )
+                    {
+                        Modification<K, V> modification = modificationsQueue.take();
+
+                        if ( modification instanceof Addition )
+                        {
+                            byte[] keyBuffer = serializer.serializeKey( modification.getKey() );
+                            ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
+                            bb.put( Modification.ADDITION );
+                            bb.put( keyBuffer );
+                            bb.flip();
+
+                            channel.write( bb );
+
+                            byte[] valueBuffer = serializer.serializeValue( modification.getValue() );
+                            bb = ByteBuffer.allocateDirect( valueBuffer.length );
+                            bb.put( valueBuffer );
+                            bb.flip();
+
+                            channel.write( bb );
+                        }
+                        else
+                        {
+                            byte[] keyBuffer = serializer.serializeKey( modification.getKey() );
+                            ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
+                            bb.put( Modification.DELETION );
+                            bb.put( keyBuffer );
+                            bb.flip();
+
+                            channel.write( bb );
+                        }
+
+                        // Flush to the disk for real
+                        channel.force( true );
+                    }
+                }
+                catch ( InterruptedException ie )
+                {
+                    //System.out.println( "Interrupted" );
+                }
+                catch ( Exception e )
+                {
+                    throw new RuntimeException( e );
+                }
+            }
+        };
+
+        journalManagerThread = new Thread( journalTask );
+        journalManagerThread.setDaemon( true );
+        journalManagerThread.start();
+    }
+
+
+    /**
      * Creates a new in-memory BTree using the BTreeConfiguration to initialize the 
      * BTree
      * 
@@ -167,9 +254,11 @@ public class BTree<K, V>
      */
     public BTree( BTreeConfiguration<K, V> configuration ) throws IOException
     {
-        String fileName = configuration.getFilePrefix() + "." + configuration.getFileSuffix();
+        String fileName = configuration.getFileName();
+        String journalName = configuration.getJournalName();
 
         File btreeFile = new File( configuration.getFilePath(), fileName );
+        File journalFile = new File( configuration.getJournalPath(), journalName );
 
         pageSize = configuration.getPageSize();
         comparator = configuration.getComparator();
@@ -181,21 +270,11 @@ public class BTree<K, V>
             throw new IllegalArgumentException( "Comparator should not be null" );
         }
 
+        file = btreeFile;
+        journal = journalFile;
+
         // Now, initialize the BTree
         init();
-
-        // Last, we load the data from the file, if it exists.
-        if ( btreeFile.exists() )
-        {
-            // The file already exists, load it.
-            file = btreeFile;
-            load( file );
-        }
-        else
-        {
-            // We will create the new file
-            file = btreeFile;
-        }
     }
 
 
@@ -319,9 +398,19 @@ public class BTree<K, V>
     public void init() throws IOException
     {
         // Create the queue containing the pending read transactions
-        //readTransactions = new ConcurrentDoublyLinkedList<Transaction<K, V>>();
         readTransactions = new ConcurrentLinkedQueue<Transaction<K, V>>();
 
+        // Create the queue containing the modifications, if it's not a in-memory btree
+        if ( file != null )
+        {
+            modificationsQueue = new LinkedBlockingDeque<Modification<K, V>>();
+            inMemory = false;
+        }
+        else
+        {
+            inMemory = true;
+        }
+
         // Initialize the PageId counter
         pageRecordIdGenerator = new AtomicLong( 0 );
 
@@ -342,18 +431,75 @@ public class BTree<K, V>
             keyType = ( Class<?> ) argumentTypes[0];
         }
 
+        nbElems = new AtomicLong( 0 );
+
+        // Check the files and create them if missing
+        if ( file != null )
+        {
+            if ( !file.exists() )
+            {
+                file.createNewFile();
+
+                if ( journal == null )
+                {
+                    journal = new File( file.getParentFile(), BTree.DEFAULT_JOURNAL );
+                }
+
+                journal.createNewFile();
+            }
+            else
+            {
+                if ( file.length() > 0 )
+                {
+                    // We have some existing file, load it 
+                    load( file );
+                }
+
+                if ( journal == null )
+                {
+                    journal = new File( file.getParentFile(), BTree.DEFAULT_JOURNAL );
+                }
+
+                journal.createNewFile();
+
+                // If the journal is not empty, we have to read it
+                // and to apply all the modifications to the current file
+                if ( journal.length() > 0 )
+                {
+                    applyJournal();
+                }
+            }
+        }
+
         // Initialize the txnManager thread
         createTransactionManager();
+
+        // Initialize the Journal manager thread if it's not a in-memory btree
+        if ( !inMemory )
+        {
+            createJournalManager();
+        }
     }
 
 
     /**
      * Close the BTree, cleaning up all the data structure
      */
-    public void close()
+    public void close() throws IOException
     {
+        // Stop the readTransaction thread
         readTransactionsThread.interrupt();
         readTransactions.clear();
+
+        if ( !inMemory )
+        {
+            // Stop the journal manager thread
+            journalManagerThread.interrupt();
+
+            // Flush the data
+            flush();
+        }
+
         rootPage = null;
     }
 
@@ -538,6 +684,12 @@ public class BTree<K, V>
                 tuple = removeResult.getRemovedElement();
             }
 
+            if ( !inMemory )
+            {
+                // Inject the modification into the modification queue
+                modificationsQueue.add( new Deletion<K, V>( key ) );
+            }
+
             // Return the value we have found if it was modified
             return tuple;
         }
@@ -666,6 +818,12 @@ public class BTree<K, V>
                 rootPage = new Node<K, V>( this, revision, pivot, leftPage, rightPage );
             }
 
+            // Inject the modification into the modification queue
+            if ( !inMemory )
+            {
+                modificationsQueue.add( new Addition<K, V>( key, value ) );
+            }
+
             // Return the value we have found if it was modified
             return modifiedValue;
         }
@@ -774,7 +932,17 @@ public class BTree<K, V>
      */
     public void flush( File file ) throws IOException
     {
-        File baseDirectory = new File( file.getParentFile().getAbsolutePath() );
+        File parentFile = file.getParentFile();
+        File baseDirectory = null;
+
+        if ( parentFile != null )
+        {
+            baseDirectory = new File( file.getParentFile().getAbsolutePath() );
+        }
+        else
+        {
+            baseDirectory = new File( "." );
+        }
 
         // Create a temporary file in the same directory to flush the current btree
         File tmpFileFD = File.createTempFile( "mavibot", null, baseDirectory );
@@ -796,11 +964,6 @@ public class BTree<K, V>
         {
             Tuple<K, V> tuple = cursor.next();
 
-            if ( bb.remaining() == 0 )
-            {
-
-            }
-
             byte[] keyBuffer = serializer.serializeKey( tuple.getKey() );
 
             writeBuffer( ch, bb, keyBuffer );
@@ -833,6 +996,68 @@ public class BTree<K, V>
     }
 
 
+    /** 
+     * Inject all the modification from the journal into the btree
+     * 
+     * @throws IOException If we had some issue while reading the journal
+     */
+    private void applyJournal() throws IOException
+    {
+        long revision = generateRevision();
+
+        if ( !journal.exists() )
+        {
+            throw new IOException( "The journal does not exist" );
+        }
+
+        FileChannel channel =
+            new RandomAccessFile( journal, "rw" ).getChannel();
+        ByteBuffer buffer = ByteBuffer.allocate( 65536 );
+
+        BufferHandler bufferHandler = new BufferHandler( channel, buffer );
+
+        // Loop on all the elements, store them in lists atm
+        try
+        {
+            while ( true )
+            {
+                // Read the type 
+                byte[] type = bufferHandler.read( 1 );
+
+                if ( type[0] == Modification.ADDITION )
+                {
+                    // Read the key
+                    K key = serializer.deserializeKey( bufferHandler );
+
+                    //keys.add( key );
+
+                    // Read the value
+                    V value = serializer.deserializeValue( bufferHandler );
+
+                    //values.add( value );
+
+                    // Inject the data in the tree. (to be replaced by a bulk load)
+                    insert( key, value, revision );
+                }
+                else
+                {
+                    // Read the key
+                    K key = serializer.deserializeKey( bufferHandler );
+
+                    // Remove the key from the tree
+                    delete( key, revision );
+                }
+            }
+        }
+        catch ( EOFException eofe )
+        {
+            // Done reading the journal. Delete it and recreate a new one
+            journal.delete();
+            journal.createNewFile();
+        }
+    }
+
+
     /**
      * Read the data from the disk into this BTree. All the existing data in the 
      * BTree are kept, the read data will be associated with a new revision.
@@ -855,6 +1080,7 @@ public class BTree<K, V>
         BufferHandler bufferHandler = new BufferHandler( channel, buffer );
 
         long nbElems = LongSerializer.deserialize( bufferHandler.read( 8 ) );
+        this.nbElems.set( nbElems );
 
         // Prepare a list of keys and values read from the disk
         //List<K> keys = new ArrayList<K>();
@@ -888,7 +1114,14 @@ public class BTree<K, V>
      */
     public void flush() throws IOException
     {
-        flush( file );
+        if ( !inMemory )
+        {
+            // Then flush the file
+            flush( file );
+
+            // And delete the journal
+            journal.delete();
+        }
     }
 
 

Modified: labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java
URL: http://svn.apache.org/viewvc/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java?rev=1373216&r1=1373215&r2=1373216&view=diff
==============================================================================
--- labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java (original)
+++ labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTreeConfiguration.java Wed Aug 15 05:30:03 2012
@@ -53,14 +53,9 @@ public class BTreeConfiguration<K, V>
     private String filePath;
 
     /**
-     * The BTree file's name. Default to "mavibot".
+     * The BTree file's name.
      */
-    private String filePrefix = "mavibot";
-
-    /**
-     * The BTree file's suffix. Default to "data".
-     */
-    private String fileSuffix = "data";
+    private String fileName;
 
     /** 
      * The maximum delay to wait before a revision is considered as unused.
@@ -83,16 +78,12 @@ public class BTreeConfiguration<K, V>
     private String journalPath;
 
     /**
-     * The journal's name. Default to "mavibot".
-     */
-    private String journalPrefix = "mavibot";
-
-    /**
-     * The journal's suffix. Default to "log".
+     * The journal's name. Default to "mavibot.log".
      */
-    private String journalSuffix = "log";
+    private String journalName = BTree.DEFAULT_JOURNAL;
 
-    /** The delay between two checkpoints. When we reach the maximum delay,
+    /** 
+     * The delay between two checkpoints. When we reach the maximum delay,
      * the BTree is flushed on disk, but only if we have had some modifications.
      * The default value is 60 seconds.
      */
@@ -226,38 +217,20 @@ public class BTreeConfiguration<K, V>
 
 
     /**
-     * @return the filePrefix
-     */
-    public String getFilePrefix()
-    {
-        return filePrefix;
-    }
-
-
-    /**
-     * @param filePrefix the filePrefix to set
+     * @return the file name
      */
-    public void setFilePrefix( String filePrefix )
+    public String getFileName()
     {
-        this.filePrefix = filePrefix;
+        return fileName;
     }
 
 
     /**
-     * @return the fileSuffix
+     * @param fileName the file name to set
      */
-    public String getFileSuffix()
+    public void setFileName( String fileName )
     {
-        return fileSuffix;
-    }
-
-
-    /**
-     * @param fileSuffix the fileSuffix to set
-     */
-    public void setFileSuffix( String fileSuffix )
-    {
-        this.fileSuffix = fileSuffix;
+        this.fileName = fileName;
     }
 
 
@@ -280,37 +253,19 @@ public class BTreeConfiguration<K, V>
 
 
     /**
-     * @return the journalPrefix
-     */
-    public String getJournalPrefix()
-    {
-        return journalPrefix;
-    }
-
-
-    /**
-     * @param journalPrefix the journalPrefix to set
-     */
-    public void setJournalPrefix( String journalPrefix )
-    {
-        this.journalPrefix = journalPrefix;
-    }
-
-
-    /**
-     * @return the journalSuffix
+     * @return the journal name
      */
-    public String getJournalSuffix()
+    public String getJournalName()
     {
-        return journalSuffix;
+        return journalName;
     }
 
 
     /**
-     * @param journalSuffix the journalSuffix to set
+     * @param journalName the journal name to set
      */
-    public void setJournalSuffix( String journalSuffix )
+    public void setJournalName( String journalName )
     {
-        this.journalSuffix = journalSuffix;
+        this.journalName = journalName;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org