You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by sa...@apache.org on 2011/09/30 14:29:00 UTC

svn commit: r1177627 - in /directory/apacheds/trunk: i18n/src/main/java/org/apache/directory/server/i18n/ i18n/src/main/resources/org/apache/directory/server/i18n/ xdbm-partition/src/main/java/org/apache/directory/server/log/ xdbm-partition/src/main/ja...

Author: saya
Date: Fri Sep 30 12:28:59 2011
New Revision: 1177627

URL: http://svn.apache.org/viewvc?rev=1177627&view=rev
Log:
First set of changes towards a transactional system over parititions.

This is the first set of changes for the logging subsystem.:
Log.java: interface to the logging subsystem. Logging subsystem logs uninterpreted blobs of user data.

LogManager.java: Manages deletion, creation and formatting of log files and reading/writing of control file. ControlFile is a special file where critical information for the logging subsystem is held. It includes checkpoint information. Control file is updated by writing to a shadow file first and then renaming(moving) the shadow file to the controlfile.

LogFlushManager.java: Manager logging of data to log files. Manager an internal circular buffer where user data is appended first and then flushed to the underlying log file either when the buffer is full or when one of the clients wants to sync the data. It is possible to specify a zero for the buffer size and directly append the data to the underlying file.Log flushing is done by the foreground threads(No special background thread is used to do it).

LogScanner.java, DefaultLogScanner.java: Implements forwards scanning of log files.

LogFileManager.java: defines an interface that exposes basic file operations on log files. It is possible to use different implementations to satisfy this interface. Currently, randomaccessfile is used but FileChannel or HDFS files can be used as well.

UserLogRecord.java: used to pass byte[] user log records and log position information between the clients and the logging subsystem.

LogFile format: Log file numbers start from 0 and keep increasing as new log files are created. A log file has a log file header which should be verified when the log file is opened for reading. User log records are uninterpreted blobs of data. A recordHeader is prepended and a record footer is appended to the user log record. This record header and footer has also verification information to detect corrupt/partiallly written log records. Record header/footer and file headers are used by the LogScanner implementation to scan the logs. This format makes it possible to log any data in the logging subsystem.    



TODO: test, handle IO exceptions and other exceptions better.

Added:
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java
    directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java
Modified:
    directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java
    directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties

Modified: directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java?rev=1177627&r1=1177626&r2=1177627&view=diff
==============================================================================
--- directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java (original)
+++ directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java Fri Sep 30 12:28:59 2011
@@ -778,7 +778,12 @@ public enum I18n
     ERR_742_CANNOT_ENCODE_ENC_TICKET_PART("ERR_742_CANNOT_ENCODE_ENC_TICKET_PART"),
     ERR_743_CANNOT_ENCODE_TYPED_DATA("ERR_743_CANNOT_ENCODE_TYPED_DATA"),
     ERR_744_NULL_PDU_LENGTH("ERR_744_NULL_PDU_LENGTH"),
-    ERR_745_NOT_A_KERBEROS_STRING("ERR_745_NOT_A_KERBEROS_STRING");
+    ERR_745_NOT_A_KERBEROS_STRING("ERR_745_NOT_A_KERBEROS_STRING"),
+    ERR_746("ERR_746"),
+    ERR_747("ERR_747"),
+    ERR_748("ERR_748"),
+    ERR_749("ERR_749"),
+    ERR_750("ERR_750");
     
     private static ResourceBundle errBundle = ResourceBundle
                 .getBundle( "org.apache.directory.server.i18n.errors" );

Modified: directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties?rev=1177627&r1=1177626&r2=1177627&view=diff
==============================================================================
--- directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties (original)
+++ directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties Fri Sep 30 12:28:59 2011
@@ -766,4 +766,9 @@ ERR_741_CANNOT_ENCODE_KRB_CRED=Cannot en
 ERR_742_CANNOT_ENCODE_ENC_TICKET_PART=Cannot encode the EncTicketPart object, the PDU size is {0} when only {1} bytes has been allocated
 ERR_743_CANNOT_ENCODE_TYPED_DATA=Cannot encode the TypedData object, the PDU size is {0} when only {1} bytes has been allocated
 ERR_744_NULL_PDU_LENGTH=The PDU length is null, this is not allowed
-ERR_745_NOT_A_KERBEROS_STRING=The value {0} is not a valid KerberosString
\ No newline at end of file
+ERR_745_NOT_A_KERBEROS_STRING=The value {0} is not a valid KerberosString
+ERR_746=Not a valid log file number  {0}
+ERR_747=Not a valid log file offset  {0}
+ERR_748=Invalid log file bufferSize/ max size is sepcified bufferSize {0} logFileSize {0}
+ERR_749=Log Scanner is already closed
+ERR_750=Log content is invalid
\ No newline at end of file

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,28 @@
+
+package org.apache.directory.server.log;
+
+/** 
+ * An exception used when the log content could be invalid.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+public class InvalidLogException extends Exception
+{
+    public InvalidLogException() {}
+
+    public InvalidLogException(String s) 
+    {
+        super(s);
+    }
+
+    public InvalidLogException(Throwable cause) 
+    {
+        super(cause);
+    }
+
+    public InvalidLogException(String s, Throwable cause) 
+    {
+        super(s, cause);
+    }
+
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,40 @@
+
+package org.apache.directory.server.log;
+
+import java.io.IOException;
+
+public interface Log
+{
+    
+    /**
+     * Initializes the logging subsystem
+     *
+     * @param logFilepath log file path
+     * @param suffix suffix for log file.
+     * @param logBufferSize size of buffer that will hold unflushed log changes. Specifigy zero if no buffering is desired
+     * @param logFileSize A soft limit on the log file size
+     */
+   public void init( String logFilepath, String suffix, int logBufferSize, long logFileSize );
+    
+    /**
+     * Logs the given user record to the log. Position in the log files where the record is logged is returned as part of
+     * userRecord.
+     *
+     * @param userLogRecord provides the user data to be logged
+     * @param sync if true, this calls returns after making sure that the appended data is reflected to the underlying media
+     * @throws IOException
+     * @throws InvalidLogException
+     */
+    public void log( UserLogRecord userRecord, boolean sync ) throws IOException, InvalidLogException;
+    
+    
+    /**
+     * Starts a san in the logs starting from the given log position
+     *
+     * @param startPoint starting position of the scan.
+     * @return
+     */
+    public LogScanner beginScan( LogAnchor startPoint );
+
+    
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,83 @@
+
+package org.apache.directory.server.log;
+
+import org.apache.directory.server.i18n.I18n;
+
+/**
+ * Implements a pointer in to the log files
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+public class LogAnchor
+{
+    // TODO move this to logger 
+    /** Invalid/unknown lsn. Log LSN starts at UNKNOWN_LSN + 1 and is ever increasing */
+    public final static long UNKNOWN_LSN = Long.MIN_VALUE;
+    
+    /** Min log file number */
+    public final static long MIN_LOG_NUMBER = 0;
+    
+    /** Min log file offset */
+    public final static long MIN_LOG_OFFSET = 0;
+    
+    
+    /** log file identifier of the anchor */
+    private long logFileNumber = 0 ;
+    
+    /** Offset into the log file identified by logfilenumber */
+    private long logFileOffset = 0;
+    
+    /** LSN corresponding to the logFileNumber and fileOffset */
+    private long logLSN = UNKNOWN_LSN; 
+    
+    public LogAnchor()
+    {
+        
+    }
+    
+    public LogAnchor( long logFileNumber, long logFileOffset, long logLSN )
+    {
+        this.resetLogAnchor( logFileNumber, logFileOffset, logLSN );
+    }
+    
+    
+    public void resetLogAnchor( long logFileNumber, long logFileOffset, long logLSN )
+    {
+        if ( logFileNumber < 0 )
+        {
+            throw new IllegalArgumentException( I18n.err( I18n.ERR_746, logFileNumber ) );
+        }
+        
+        if ( logFileOffset < 0 )
+        {
+            throw new IllegalArgumentException( I18n.err( I18n.ERR_747, logFileOffset ) );
+        }
+        
+        
+        this.logFileNumber = logFileNumber;
+        this.logFileOffset = logFileOffset;
+        this.logLSN = logLSN;
+    }
+    
+    public void resetLogAnchor( LogAnchor logAnchor )
+    {
+        this.resetLogAnchor( logAnchor.getLogFileNumber(), logAnchor.getLogFileOffset(), logAnchor.getLogLSN() );
+    }
+    
+     
+    public long getLogFileNumber()
+    {
+        return this.logFileNumber;
+    }
+    
+    
+    public long getLogFileOffset()
+    {
+        return this.logFileOffset;
+    }
+    
+    
+    public long getLogLSN()
+    {
+        return this.logLSN;
+    }  
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,57 @@
+
+package org.apache.directory.server.log;
+
+import java.util.Comparator;
+
+import org.apache.directory.server.i18n.I18n;
+
+public class LogAnchorComparator implements Comparator<LogAnchor>
+{
+    /**
+     * Compare two log anchors.
+     *
+     * @param obj1 First object
+     * @param obj2 Second object
+     * @return a positive integer if obj1 > obj2, 0 if obj1 == obj2,
+     *         and a negative integer if obj1 < obj2
+     */
+     public int compare( LogAnchor obj1, LogAnchor obj2 )
+     {
+        if ( obj1 == null ) {
+            throw new IllegalArgumentException( I18n.err( I18n.ERR_525 ) );
+        }
+
+        if ( obj2 == null ) {
+            throw new IllegalArgumentException( I18n.err( I18n.ERR_526 ) );
+        }
+
+        long logFileNumber1 = obj1.getLogFileNumber();
+        long logFileOffset1 = obj1.getLogFileOffset();
+        long logFileNumber2 = obj2.getLogFileNumber();
+        long logFileOffset2 = obj2.getLogFileOffset();
+        
+        if ( logFileNumber1 > logFileNumber2 )
+        {
+            return 1;
+        }
+        else if ( logFileNumber1 == logFileNumber2 )
+        {
+            if ( logFileOffset1 > logFileOffset2 )
+            {
+                return 1;
+            }
+            else if ( logFileOffset1 == logFileOffset2 )
+            {
+                return 0;
+            }
+            else
+            {
+                return -1;
+            }
+        }
+        else
+        {
+            return -1;
+        }     
+     }
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,42 @@
+
+package org.apache.directory.server.log;
+
+
+import java.io.IOException;
+
+public interface LogScanner
+{
+    /**
+     * Reads and returns the next user record from the log into a backing byte array 
+     * and returns a reference to it. Returned array can be overwritten 
+     * after the next call to getNextRecord()
+     *
+     * @param  log record to be filled in by
+     * @return true if there is a next record
+     * throws IOException
+     * throws InvalidLogException thrown if the log content is invalid 
+     */
+    public boolean getNextRecord(UserLogRecord logRecord) throws IOException, InvalidLogException;
+    
+    
+    /**
+     * Returns the last successfully read log file number
+     *
+     * @return last successfully read log file number
+     */
+    public long getLastGoodFileNumber();
+    
+    /**
+     * Returns the last successfully read log file number
+     *
+     * @return last successfully read log file number
+     */
+    public long getLastGoodOffset();
+    
+    /**
+     * Closes the scanner and releases any
+     * resources. 
+     *
+     */
+    public void close();
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,47 @@
+
+package org.apache.directory.server.log;
+
+/** 
+ * A user log record that can be used to pass user record between the clients and the logger
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+public class UserLogRecord
+{
+    private final static int INITIAL_SIZE =  1024;
+    
+    /** array used to hold user log records */
+    private byte[] recordHolder;
+    
+    /** offset int the  byte array where user record starts */
+    int offset;
+    
+    /** length of the user record in the byte array */
+    int length;
+    
+    /** Position of the log record in the log */
+    private LogAnchor logAnchor = new LogAnchor();
+    
+    public void setData( byte[] data, int length )
+    {
+        this.recordHolder = recordHolder;
+    }
+    
+    public byte[] getDataBuffer()
+    {
+        return recordHolder;
+    }
+    
+   
+    public int getDataLength()
+    {
+        return length;
+    }
+    
+    
+    public LogAnchor getLogAnchor()
+    {
+        return logAnchor;
+    }
+    
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,243 @@
+
+package org.apache.directory.server.log.impl;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+
+import java.io.RandomAccessFile;
+
+import org.apache.directory.server.i18n.I18n;
+import org.apache.directory.server.log.impl.LogFileManager.LogFileReader;
+import org.apache.directory.server.log.impl.LogFileManager.LogFileWriter;
+
+class DefaultLogFileManager implements LogFileManager 
+{
+    private String logFilePath;
+    private String suffix;
+    
+    /**
+     * Inits the log file manager to use the given logfile path and the suffix. Each log file
+     * has name logFileName_<logFileNumber>.suffix 
+     *
+     * @param logFilepath log file path
+     * @param suffix suffix for log file.
+     */
+    public void init( String logFilepath, String suffix )
+    {
+        this.logFilePath = logFilePath;
+        this.suffix = suffix;
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public LogFileReader getReaderForLogFile( long logFileNumber ) throws IOException, FileNotFoundException
+    {      
+        File logFile = this.makeLogFileName( logFileNumber );
+        
+        // This will throw a file not found exception if file does not exist
+        RandomAccessFile raf = new RandomAccessFile( logFile, "r" );
+        
+        return new LogFileReader( raf, logFileNumber );
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public LogFileWriter getWriterForLogFile( long logFileNumber ) throws IOException, FileNotFoundException
+    {
+        File logFile = this.makeLogFileName( logFileNumber );
+        
+        // This will throw a file not found exception if file does not exist
+        RandomAccessFile raf = new RandomAccessFile( logFile, "rw" );
+        
+        return new LogFileWriter( raf, logFileNumber );
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public boolean createLogFile( long logFileNumber ) throws IOException
+    {
+        File logFile = this.makeLogFileName( logFileNumber );
+        
+        boolean fileAlreadyExists = !logFile.createNewFile();
+        
+        return fileAlreadyExists;
+    }
+    
+       
+    /**
+     * {@inheritDoc}
+     */
+    public void truncateLogFile( long logFileNumber, long size ) throws IOException, FileNotFoundException
+    {
+        if ( size < 0 )
+        {
+            throw new IllegalArgumentException( "Invalid file size is specified for the log file: " + logFileNumber + " " + size );
+        }
+        
+        File logFile = this.makeLogFileName( logFileNumber );
+        
+        // This will throw a file not found exception if file does not exist
+        RandomAccessFile raf = new RandomAccessFile( logFile, "rw" );
+        
+        raf.setLength( size );
+        raf.getFD().sync();
+    }
+    
+   
+    /**
+     * {@inheritDoc}
+     */
+    public void deleteLogFile( long logFileNumber )
+    {
+        File logFile = this.makeLogFileName( logFileNumber );
+        
+        logFile.delete();
+    }
+    
+   
+    /**
+     * {@inheritDoc}
+     */
+    public boolean rename(long originalLogFileNumber, long newLongFileNumber)
+    {
+        File oldLogFile = this.makeLogFileName( originalLogFileNumber );  
+        boolean result = oldLogFile.renameTo( this.makeLogFileName( newLongFileNumber ) );
+        return result;
+    }
+    
+    
+    private File makeLogFileName( long logFileNumber )
+    {
+        
+        return new File( logFilePath + "/" + LogFileManager.LOG_NAME_PREFIX + logFileNumber + "." + suffix );
+    }
+    
+    static class LogFileReader implements LogFileManager.LogFileReader
+    {
+        /** Underlying log file */
+        RandomAccessFile raf;
+        
+        /** Log file identifier */
+        long logFileNumber;
+        
+  
+        public LogFileReader( RandomAccessFile raf, long logFileNumber )
+        {
+            this.raf = raf;
+            this.logFileNumber = logFileNumber;
+        }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public int read( byte[] buffer, int offset, int length ) throws IOException, EOFException
+        {
+            raf.readFully( buffer, offset, length );
+            return length;
+        }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public void seek( long position ) throws IOException
+        {
+            raf.seek( position );
+        }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public void close() throws IOException
+        {
+            raf.close();
+        }
+        
+        
+        /**
+         * {@inheritDoc}
+         */
+        public long logFileNumber()
+        {
+            return logFileNumber;
+        }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public long getLength() throws IOException
+        {
+            return raf.length();
+        }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public long getOffset() throws IOException
+        {
+            return raf.getFilePointer();
+        }
+    }
+    
+    
+    static class LogFileWriter implements LogFileManager.LogFileWriter
+    {
+        /** Underlying log file */
+        RandomAccessFile raf;
+        
+        /** Log file identifier */
+        long logFileNumber;
+        
+  
+        public LogFileWriter( RandomAccessFile raf, long logFileNumber )
+        {
+            this.raf = raf;
+            this.logFileNumber = logFileNumber;
+        }
+        /**
+         * {@inheritDoc}
+         */
+        public void append( byte[] buffer, int offset, int length ) throws IOException
+        {
+            raf.write( buffer, offset, length );
+        }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public void sync() throws IOException
+        {
+             raf.getFD().sync();
+        }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public void close() throws IOException
+        {
+            raf.close();
+        }
+        
+         /**
+          * {@inheritDoc}
+          */
+        public long logFileNumber()
+        {
+            return logFileNumber;
+        }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public long getLength() throws IOException
+        {
+            return raf.length();
+        }
+    }
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,354 @@
+    
+package org.apache.directory.server.log.impl;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+
+import org.apache.directory.server.i18n.I18n;
+import org.apache.directory.server.log.InvalidLogException;
+import org.apache.directory.server.log.LogAnchor;
+import org.apache.directory.server.log.LogScanner;
+import org.apache.directory.server.log.UserLogRecord;
+
+public class DefaultLogScanner implements LogScanner
+{
+    /** LSN of the last successfully read log record */
+    private long prevLSN = LogAnchor.UNKNOWN_LSN;
+    
+    /** File number of the last successfully read position's file number */
+    private long prevLogFileNumber = -1;
+    
+    /** File number of the last known good offset */
+    private long prevLogFileOffset = -1;
+    
+    /** Position to read the next record from */
+    private LogAnchor startingLogAnchor = new LogAnchor();
+    
+    /** Last Read Lsn */
+    private long lastReadLSN = LogAnchor.UNKNOWN_LSN;
+    
+    /** Current log file pointer to read from */
+    LogFileManager.LogFileReader currentLogFile;
+    
+    /** True if scanner is closed */
+    boolean closed = false;
+    
+    /** True if scanner hit invalid content. No more reads will be done after invalid log content is hit */
+    boolean invalidLog = false;
+    
+    /** log file manager used to open files for reading */
+    LogFileManager logFileManager;
+    
+    /** Buffer used to read log file markers */
+    byte markerBuffer[] = new byte[LogFileRecords.MAX_MARKER_SIZE];
+    
+    /** ByteBuffer wrapper for the marker buffer */
+    ByteBuffer markerHead = ByteBuffer.wrap( markerBuffer );
+    
+    public DefaultLogScanner( LogAnchor startingLogAnchor, LogFileManager logFileManger )
+    {
+        startingLogAnchor.resetLogAnchor( startingLogAnchor );
+        this.logFileManager = logFileManager;
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public boolean getNextRecord(UserLogRecord logRecord) throws IOException, InvalidLogException
+    {
+        boolean startingRead = false;
+        
+        checkIfClosed();
+        
+        if ( invalidLog )
+        {
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+        }
+        
+        long fileLength;
+        long fileOffset;
+        
+        
+        try
+        {
+            if ( currentLogFile == null )
+            {
+                
+                long startingOffset = startingLogAnchor.getLogFileOffset();
+                
+                // Read and verify header
+                currentLogFile = this.readFileHeader( startingLogAnchor.getLogFileNumber() );
+                
+                if ( currentLogFile == null )
+                {
+                    return false; // Nothing to read
+                }
+    
+                if ( startingOffset > 0 )
+                {
+                    if ( startingOffset < LogFileRecords.LOG_FILE_HEADER_SIZE )
+                    {
+                        // Offset should be at log file marker boundary
+                        this.markScanInvalid();
+                    }
+                    
+                    prevLogFileOffset = Math.max( startingOffset, currentLogFile.getLength() );
+                    currentLogFile.seek( startingOffset );
+                }
+                startingRead = true;
+            }     
+            
+            while ( true )
+            {
+                fileLength = currentLogFile.getLength();
+                fileOffset = currentLogFile.getOffset(); 
+                
+                if ( fileOffset > fileLength )
+                {
+                    this.markScanInvalid();
+                }
+                else if ( fileOffset == fileLength )
+                {
+                    // Switch to next file.. This reads and verifies the header of the new file
+                    long nextLogFileNumber = currentLogFile.logFileNumber() + 1;
+                    currentLogFile.close();
+                    currentLogFile = this.readFileHeader( nextLogFileNumber );
+                    
+                    if ( currentLogFile == null )
+                        return false; // Done.. End of log stream
+                    
+                   
+                }
+                else
+                {
+                    break;  // break to read the user record
+                }
+            }
+            
+            // Read and verify record header
+            int recordLength = this.readRecordHeader();
+            
+            // If starting read, then check if we have the expected lsn in case
+            // expected lsn is known
+            if ( startingRead )
+            {
+                long startingLSN = startingLogAnchor.getLogLSN();
+                
+                if ( ( startingLSN != LogAnchor.UNKNOWN_LSN ) && ( startingLSN != lastReadLSN ) )
+                {
+                    this.markScanInvalid();
+                }
+            }
+            
+            // Read and verify user block
+            this.readLogRecord( logRecord, recordLength );
+            
+            // Read and verify footer
+            this.readRecordFooter();
+            
+            
+            // If we are here, then we successfully read the log record. 
+            // Set the read record's position, uptate last read good location
+            // and then return
+            fileOffset = currentLogFile.getOffset();
+            
+            LogAnchor userLogAnchor = logRecord.getLogAnchor();
+            userLogAnchor.resetLogAnchor( currentLogFile.logFileNumber(), fileOffset - recordLength, lastReadLSN );
+            
+            prevLogFileOffset = fileOffset;
+            prevLogFileNumber = currentLogFile.logFileNumber();
+            prevLSN = lastReadLSN;
+        }
+        catch( EOFException e)
+        {
+            // This means either the log record or the log file header was
+            // partially written. Treat this as invalid log content
+            this.markScanInvalid();
+        }
+        
+        return true;
+        
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public long getLastGoodFileNumber()
+    {
+        return this.prevLogFileNumber;
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public long getLastGoodOffset()
+    {
+        return this.prevLogFileOffset;
+    }
+
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void close()
+    {
+        if ( closed == false )
+        {
+            closed = true;
+            if (currentLogFile != null)
+            {
+                try
+                {
+                    currentLogFile.close();
+                    currentLogFile = null;
+                }
+                catch( IOException e )
+                {
+                    // Ignore
+                }
+            }
+        }
+        
+    }
+    
+    
+    private int readRecordHeader() throws IOException, InvalidLogException, EOFException
+    {
+        boolean invalid = false; 
+        
+        markerHead.rewind();
+        currentLogFile.read( markerBuffer, 0, LogFileRecords.RECORD_HEADER_SIZE );
+        int magicNumber = markerHead.getInt();
+        int length = markerHead.getInt();
+        long lsn = markerHead.getLong();
+        long checksum = markerHead.getLong();
+        
+        if ( magicNumber != LogFileRecords.RECORD_HEADER_MAGIC_NUMBER )
+        {
+            invalid = true;
+        }
+        
+        if ( length <= ( LogFileRecords.RECORD_HEADER_SIZE + LogFileRecords.RECORD_FOOTER_SIZE ) )
+        {
+            invalid = true;
+        }
+        
+        if ( lsn < prevLSN )
+        {
+            invalid = true;
+        }
+        
+        if ( checksum != ( lsn ^ length ) )
+        {
+            invalid = true;
+        }
+        
+        if ( invalid == true )
+        {
+            this.markScanInvalid();
+        }
+        
+        // Everything went fine
+        lastReadLSN = lsn;
+        return length;
+    }
+    
+    
+    private void readRecordFooter() throws IOException, InvalidLogException, EOFException 
+    {
+        boolean invalid = false; 
+        
+        markerHead.rewind();
+        currentLogFile.read( markerBuffer, 0, LogFileRecords.RECORD_FOOTER_SIZE );
+        int checksum = markerHead.getInt();
+        int magicNumber = markerHead.getInt();
+      
+        if ( magicNumber != LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER )
+        {
+            invalid = true;
+        }
+        
+        // TODO compute checksum
+        
+        if ( invalid == true )
+        {
+            this.markScanInvalid();
+        }
+    }
+    
+    private void readLogRecord( UserLogRecord userRecord, int length ) throws IOException, EOFException
+    {
+        byte dataBuffer[] = userRecord.getDataBuffer();
+        
+        if ( dataBuffer == null || dataBuffer.length < length )
+        {
+            // Allocate a larger buffer
+            dataBuffer = new byte[length];
+        }
+        
+        currentLogFile.read( dataBuffer, 0, length );
+        userRecord.setData( dataBuffer, length );
+    }
+    
+    private LogFileManager.LogFileReader readFileHeader( long logFileNumber ) throws IOException, InvalidLogException, EOFException
+    {
+        boolean invalid = false;
+        LogFileManager.LogFileReader logFile;      
+          
+        try
+        {
+            logFile = logFileManager.getReaderForLogFile( logFileNumber );
+        }
+        catch ( FileNotFoundException e )
+        {
+            return null; // end of log scan
+        }
+        
+        // File exists
+        this.prevLogFileNumber = logFileNumber;
+        this.prevLogFileOffset = 0;
+        
+        markerHead.rewind();
+        currentLogFile.read( markerBuffer, 0, LogFileRecords.LOG_FILE_HEADER_SIZE );
+        long persistedLogFileNumber = markerHead.getLong();
+        int magicNumber = markerHead.getInt();
+      
+        if ( persistedLogFileNumber != logFileNumber )
+        {
+            invalid = true;
+        }
+        
+        if ( magicNumber != LogFileRecords.LOG_FILE_HEADER_MAGIC_NUMBER )
+        {
+            invalid = true;
+        }
+       
+        
+        if ( invalid == true )
+        {
+            this.markScanInvalid();
+        }
+        
+        // Everything is fine, advance good file offset and return
+        this.prevLogFileOffset = LogFileRecords.LOG_FILE_HEADER_SIZE;
+        return logFile;
+    }
+    
+    private void checkIfClosed()
+    {
+        if ( closed == true )
+        {
+            throw new IllegalStateException( I18n.err( I18n.ERR_749 ) );
+        }
+    }
+    
+    private void markScanInvalid() throws InvalidLogException
+    {
+        invalidLog = true;
+        throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+    }
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,175 @@
+package org.apache.directory.server.log.impl;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+
+
+/**
+ * Defines an interface that log manager can use to manage log files.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+interface LogFileManager
+{
+    final static String LOG_NAME_PREFIX = "log_"; 
+  
+    /**
+     * Inits the log file manager to use the given logfile path and the suffix. Each log file
+     * has name logFileName_<logFileNumber>.suffix 
+     *
+     * @param logFilepath log file path
+     * @param suffix suffix for log file.
+     */
+    public void init( String logFilepath, String suffix );
+    
+    
+    /**
+     * Returns a reader for the given log file number
+     *
+     * @param logFileNumber identifier of the log file to read
+     * @return reader for the given logfile
+     * @throws IOException
+     * @throws FileNotFoundException
+     */
+    public LogFileReader getReaderForLogFile( long logFileNumber ) throws IOException, FileNotFoundException;
+    
+  
+    /**
+     * Returns a writer for the given log file number
+     *
+     * @param logFileNumber identifier of the log file to read
+     * @return writer for the given logfile
+     * @throws IOException
+     * @throws FileNotFoundException
+     */
+    public LogFileWriter getWriterForLogFile( long logFileNumber ) throws IOException, FileNotFoundException;
+    
+    
+    /**
+     * Create a log file with the given identifier
+     * 
+     * @param logFileNumber identifier of the log file to write to.
+     * @return true if file already existed
+     * @throws IOException
+     */
+    public boolean createLogFile( long logFileNumber ) throws IOException;
+    
+       
+    /**
+     * Truncates the file to the given size. Mostly used for throwing away
+     * junk at the end of log file after a log replay after a crash.
+     *
+     * @param logFileNumber identifier of the log file
+     * @param size new size of the file
+     * @throws IOException
+     */
+    public void truncateLogFile( long logFileNumber, long size ) throws IOException;
+    
+    /**
+     * Deletes the underlying log file.
+     *
+     * @param logFileNumber identifier of the log file
+     */
+    public void deleteLogFile( long logFileNumber ); 
+    
+    
+    /**
+     * Moves the old log file to a new name
+     *
+     * @param orignalLogFileNumber identifier of the old file
+     * @param newLongFileNumber identifier of the new file
+     * @return true if the rename succeeded
+     */
+    public boolean rename(long orignalLogFileNumber, long newLongFileNumber);
+    
+    
+    interface LogFileReader
+    {
+        /**
+         *     
+         * Reads from the file at the current position 
+         *
+         * @param buffer data destination
+         * @param offset destination offset
+         * @param length size of read
+         * @return number of bytes actually read.
+         * @throws IOException
+         */
+        public int read( byte[] buffer, int offset, int length ) throws IOException, EOFException;
+        
+
+        /**
+         * Repositions the reader at the given offset
+         *
+         * @param position
+         */
+        public void seek( long position ) throws IOException;
+        
+        /**
+         * Close the log file reader and releases the resources 
+         *
+         */
+        public void close() throws IOException;
+        
+        
+        /**
+         * Each log file is assigned a sequence number. This method
+         * returns that number
+         *
+         * @return number assigned to this log file
+         */
+        public long logFileNumber();
+        
+        /**
+         * returns the length of the file
+         */
+        public long getLength() throws IOException;
+        
+        /**
+         * returns the offset of the next read
+         */
+        public long getOffset() throws IOException;
+    }
+    
+    interface LogFileWriter
+    {
+        /**
+         * Append the given data to the log file 
+         *
+         * @param buffer source of data
+         * @param offset offset into buffer
+         * @param length number of bytes to be appended
+         */
+        public void append( byte[] buffer, int offset, int length ) throws IOException;
+        
+        
+        
+        /**
+         * Sync the file contents to media  
+         *
+         */
+        public void sync() throws IOException;
+        
+        /**
+         * Close the log file reader and releases the resources 
+         *
+         */
+        public void close() throws IOException;
+        
+        /**
+         * Each log file is assigned a sequence number. This method
+         * returns that number
+         *
+         * @return number assigned to this log file
+         */
+        public long logFileNumber();
+        
+        /**
+         * returns the length of the file
+         */
+        public long getLength() throws IOException;
+
+    }
+    
+}
\ No newline at end of file

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,62 @@
+
+package org.apache.directory.server.log.impl;
+
+
+public class LogFileRecords
+{
+    /** 
+     * When a user appends a log record, a header and 
+     * footer that surrounds the log record is inserted
+     * into the log. Header and footer information makes
+     * it easier to scan the log, detect the end of 
+     * log during log scan and verify the integrity of log.  
+     */  
+    
+    /**
+     * Record Header marker
+     * int RECORD_HEADER_MAGIC_NUMBER
+     * int length  length of header + user log record + legnth of footer
+     * long recordLSN     lsn of the log record   
+     * long headerChecksum checksum to verify header
+     */
+    
+    /** Header magic number */
+    final static int RECORD_HEADER_MAGIC_NUMBER = 0x010F010F;
+     
+    /** Total header size */
+    final static int RECORD_HEADER_SIZE = 24;
+    
+    /**
+     * Record Footer marker 
+     * int checksum 
+     * int RECORD_FOOTER_MAGIC_NUMBER
+     */
+    
+    /** Footer magic number */
+    final static int RECORD_FOOTER_MAGIC_NUMBER = 0x0F010F01;
+   
+    /** Total header size */
+    final static int RECORD_FOOTER_SIZE = 8;
+    
+    /**
+     * LogFileHeader marker
+     * long log file number
+     * int LOG_FILE_HEADER_MAGIC_NUMBER 0xFF00FF00
+     */
+    
+    /** Log file header marker size */
+    final static int LOG_FILE_HEADER_SIZE = 12;
+    
+    /** Log file header magic number */
+    final static int LOG_FILE_HEADER_MAGIC_NUMBER = 0xFF00FF00;
+    
+    /** Maximum marker size */
+    final static int MAX_MARKER_SIZE;
+    
+    static
+    {
+        int markerSize = Math.max( RECORD_HEADER_SIZE, RECORD_FOOTER_SIZE );
+        MAX_MARKER_SIZE = Math.max( markerSize, LOG_FILE_HEADER_SIZE );
+    }
+    
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,500 @@
+
+package org.apache.directory.server.log.impl;
+
+import java.nio.ByteBuffer;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import java.io.IOException;
+
+import org.apache.directory.server.i18n.I18n;
+import org.apache.directory.server.log.InvalidLogException;
+import org.apache.directory.server.log.LogAnchor;
+import org.apache.directory.server.log.UserLogRecord;
+
+/**
+ * Manages the flushing of log to media and scanning of logs. All appends to the log file go through this class. 
+ *
+ * Internally it manages a circular  buffer where appends initially go. Appends are first 
+ * appended to this in memory circular log. As the in memory circular log fills up or as the user requests
+ *  memory buffer is flushed to the underlying media.  
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+class LogFlushManager
+{
+    
+    /** Ever increasing logical log sequence number assigned to user log records. Bumped up under append lock */
+    private long logLSN = Long.MIN_VALUE + 1;
+       
+    /** Memory buffer size in bytes */
+    private final int logBufferSize;
+        
+    /** Synchronizes appends */
+    private final Lock appendLock = new ReentrantLock();
+    
+    /** Synchronizes flushes to media */
+    private final Lock flushLock = new ReentrantLock();
+    
+    /** Used to wait on ongoing flush */
+    private final Condition flushCondition = flushLock.newCondition();
+     
+    /** In memory LogBuffer */
+    private LogBuffer logBuffer;
+    
+    /** Flush status */
+    private FlushStatus flushStatus = new FlushStatus(); 
+       
+    /** Current LogFile appends go to */
+    private LogFileManager.LogFileWriter currentLogFile;
+
+    /** Log manager */
+    LogManager logManager;
+
+    /** Size of data appended to the currentLogFile so far */
+    long appendedSize;
+    
+    /** Sof limit on the log file size */
+    long targetLogFileSize;
+    
+    public LogFlushManager(LogManager logManager, int logMemoryBufferSize, long logFileSize )
+    {
+        if ( ( logMemoryBufferSize < 0 ) || ( logFileSize < 0 ) )
+        {
+            throw new IllegalArgumentException( I18n.err( I18n.ERR_748, logMemoryBufferSize, logFileSize ) );
+        }
+        
+        logBufferSize = logMemoryBufferSize;
+        targetLogFileSize = logFileSize;
+        this.logManager = logManager;
+        
+        logBuffer = new LogBuffer( logBufferSize, currentLogFile );
+        
+    }
+    
+    /**
+     * Appends the given user record to the log. Position where the record is appended is returned as part of
+     * userRecord.
+     *
+     * @param userLogRecord provides the user data to be appended to the log
+     * @param sync if true, this calls returns after making sure that the appended data is reflected to the underlying file
+     * @throws IOException
+     * @throws InvalidLogException
+     */
+    public void append(UserLogRecord userRecord, boolean sync ) throws IOException, InvalidLogException
+    {
+        long lsn;
+        boolean appendedRecord = false;
+        byte[] userBuffer = userRecord.getDataBuffer();
+        int length  = userRecord.getDataLength();
+        LogAnchor userLogAnchor = userRecord.getLogAnchor(); 
+        
+        int recordSize = LogFileRecords.RECORD_HEADER_SIZE + LogFileRecords.RECORD_FOOTER_SIZE + length;
+        
+        appendLock.lock();
+        
+        lsn = logLSN++;
+        
+        if ( currentLogFile == null )
+        {
+            // We are just starting, get the current log file
+            currentLogFile = logManager.switchToNextLogFile( null );
+            appendedSize = currentLogFile.getLength();
+        }
+        
+        if ( appendedSize > this.targetLogFileSize )
+        {
+            // Make sure everything outstanding goes to the current log file
+            this.flush( lsn, null, 0, 0, true);
+            
+            currentLogFile = logManager.switchToNextLogFile( currentLogFile );
+            appendedSize = currentLogFile.getLength();
+        }
+        
+        if ( recordSize <= logBufferSize )
+        {
+            ByteBuffer writeHead = logBuffer.writeHead;
+            
+            while ( !appendedRecord )
+            {
+                // First get the rewind count then the position to which the readhead advanced
+                int readHeadRewindCount = logBuffer.readHeadRewindCount.get();
+                int readHeadPosition = logBuffer.readHeadPosition;                
+                
+                if ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount ) || 
+                    ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount + 1 ) && 
+                        ( readHeadPosition < writeHead.position() ) ) )
+                {
+                    if ( writeHead.remaining() >= recordSize )
+                    {
+                        this.writeHeader( writeHead, length, lsn );
+                        writeHead.put( userBuffer, 0, length );
+                        this.writeFooter( writeHead, 0 );
+                        appendedRecord = true;
+                    }
+                    else // ( writeHead.remaining() < recordSize )
+                    {
+                        if ( writeHead.remaining() >= LogFileRecords.RECORD_HEADER_SIZE )
+                        {
+                            // Write a skip record
+                            this.writeHeader( writeHead, -1, -1 );
+                        }
+                        
+                        // rewind buffer now
+                        writeHead.rewind();
+                        logBuffer.writeHeadRewindCount++;
+                    }
+                }
+                else 
+                {
+                    assert( logBuffer.writeHeadRewindCount == ( readHeadRewindCount + 1 ) ) : 
+                            "Unexpected sequence number for read/write heads:" + logBuffer.writeHeadRewindCount +
+                            " " + readHeadRewindCount;
+                    
+                    if ( ( readHeadPosition - writeHead.position() ) > recordSize )
+                    {
+                        this.writeHeader( writeHead, length, lsn );
+                        writeHead.put( userBuffer, 0, length );
+                        this.writeFooter( writeHead, 0 );
+                        appendedRecord = true;
+                    }
+                    else
+                    {
+                        this.flush( lsn, null, 0, 0, true);
+                    }
+                }
+            }
+            
+        }
+        else
+        {   
+            this.flush( lsn, userBuffer, 0, length, true );
+        }
+        
+        
+        
+        userLogAnchor.resetLogAnchor( currentLogFile.logFileNumber(), appendedSize, lsn );
+        this.appendedSize += recordSize;
+    
+        appendLock.unlock();
+        
+        if ( sync )
+            this.flush( lsn, null, 0, 0, false );
+
+    }
+    
+    /**
+     * Flushes the changes in the log buffer upto the given point. The given point is determined as follows:
+     * appendLock is held: flushLSN is the highest lsn generated by the logging system and no more appends can
+     * proceed. In this case log is flushed until where the write head is.Log record with the flushLSN might not
+     * have been appended yet.
+     * 
+     * Otherwise: Given flushLSN is appended to the log already. Log is flushed upto max(flushLSN, current flashSatus.uptoLSN)
+     * 
+     * Also userBuffer != null => appendLockHeld == true
+     * 
+     * Only one thread can do flush. Once a thread find out that a flush is already going on, it waits for the ongoing flush
+     * and is woken up to do its flush.
+     * 
+     * flushStatus.uptoLSN represents the highest lsn that any thread wanted to sync. If a couple of threads wait on sync to
+     * complete, the thread that wakes up and does the sync will take it for the team and sync upto flushStatus.uptoLSN so 
+     * that logging is more efficient.
+     * 
+     *  
+     *
+     * @param flushLSN max LSN the calling thread wants to sync upto
+     * @param userBuffer if not null, user buffer is appended to the log without any buffering
+     * @param offset offset of data in user buffer
+     * @param length length of user data
+     * @param appendLockHeld true if append lock is held
+     * @throws IOException
+     */
+    private void flush( long flushLSN, byte[] userBuffer, int offset, int length, 
+                        boolean appendLockHeld ) throws IOException
+    {    
+        long uptoLSN = flushLSN;
+       
+        if ( appendLockHeld == true )
+        {
+            uptoLSN--;
+        }
+        
+        flushLock.lock();
+        
+        // Update max requested lsn if necessary
+        if ( uptoLSN > flushStatus.uptoLSN )
+        {
+            flushStatus.uptoLSN = uptoLSN;
+        }
+        
+        /*
+         * Check if we need to do flush and wait for ongoing flush if
+         * necessary
+         */
+        
+        while ( true )
+        {
+            if ( ( flushStatus.flushedLSN >= uptoLSN ) && ( appendLockHeld == false ) )
+            {
+                flushLock.unlock();
+                return;
+            }      
+            
+            if ( flushStatus.flushInProgress == false )
+            {
+                break;
+            }
+            
+            flushStatus.numWaiters++;
+            flushCondition.awaitUninterruptibly();
+            flushStatus.numWaiters--;
+        }
+        
+        // Mark flush in progress and do the flush
+        flushStatus.flushInProgress = true;
+        
+        // If not appendlock held, adjust uptoLSN with the max one requested by any thread
+        if ( appendLockHeld == false )
+        {
+            uptoLSN = flushStatus.uptoLSN;
+        }
+        else
+        {
+            uptoLSN = flushLSN;
+        }
+        
+        flushLock.unlock();
+        
+        long flushedLSN = this.doFlush( uptoLSN, appendLockHeld );
+        
+        // Now if there is a user buffer, flush from that        
+        if ( userBuffer != null )
+        {
+            ByteBuffer headerFooterHead = logBuffer.headerFooterHead;
+            
+            headerFooterHead.rewind();
+            this.writeHeader( headerFooterHead, length, flushLSN );
+            currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_HEADER_MAGIC_NUMBER );
+            
+            currentLogFile.append( userBuffer, offset, length );   
+            
+            headerFooterHead.rewind();
+            this.writeFooter( headerFooterHead, 0 );
+            currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_FOOTER_SIZE );
+
+            flushedLSN = flushLSN;
+        }
+        
+        currentLogFile.sync();
+        
+        flushLock.lock();
+        
+        if ( flushedLSN != LogAnchor.UNKNOWN_LSN )
+        {
+            flushStatus.flushedLSN = flushedLSN;
+            
+            if ( flushStatus.flushedLSN > flushStatus.uptoLSN )
+            {
+                // This should only happen with append lock held
+                assert( appendLockHeld == true ) : "FlushedLSN went ahead of uptoLSN while appendlock is not held: " + flushStatus.flushedLSN + "  " + flushStatus.uptoLSN;
+                
+                flushStatus.uptoLSN = flushStatus.flushedLSN;
+            }
+        }
+        
+        flushStatus.flushInProgress = false;
+        
+        if ( flushStatus.numWaiters != 0 )
+        {
+            flushCondition.signalAll();
+        }
+        
+        flushLock.unlock();
+    }
+    
+    
+    /**
+     * Walks the log buffer and writes it to the underlying log file until the uptoLSN or current write head.
+     *
+     * @param uptoLSN max LSN until where log is flushed
+     * @param appendLockHeld true if appendlock held.
+     * @return lsn upto which flush is done. UNKNOWN_LSN if no flushing is done.
+     * @throws IOException
+     */
+    private long doFlush( long uptoLSN, boolean appendLockHeld  ) throws IOException
+    {
+        ByteBuffer readHead = logBuffer.readHead;
+        ByteBuffer writeHead = logBuffer.writeHead;
+        boolean done = false;
+        
+        int magicNumber;
+        int length;
+        long lsn = LogAnchor.UNKNOWN_LSN;
+        
+        while ( !done )
+        {
+            int totalLength = 0;
+            while( true )
+            {
+                /*
+                 * If append lock is held, we might hit write head. We can read
+                 * the write head here when append lock is held
+                 */
+                if ( appendLockHeld )
+                {
+                    if ( ( writeHead.position() == readHead.position() ) &&
+                            ( logBuffer.writeHeadRewindCount == logBuffer.readHeadRewindCount.get() ) )
+                    {
+                        done = true;
+                        break;
+                    }
+                }
+                
+                // If less than header length left to process, then break and flush whatever we got so far
+                if ( readHead.remaining() < LogFileRecords.RECORD_HEADER_SIZE )
+                    break;
+                
+                magicNumber = readHead.getInt();
+                
+                assert( magicNumber == LogFileRecords.RECORD_HEADER_MAGIC_NUMBER ) : " Record header magic " +
+                		"number does not match " + magicNumber + " expected "+ 
+                		LogFileRecords.RECORD_HEADER_MAGIC_NUMBER;
+                
+                length = readHead.getInt();
+                
+                // Did we hit a skip record at the end of the buffer?
+                if ( length == LogBuffer.SKIP_RECORD_LENGTH )
+                    break;
+                
+                
+                // Sanitize length, it includes header and footer overhead
+                assert( length >  ( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER + LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER) ) :
+                    "Record length doesnt make sense:" + length + " expected:" +
+                    ( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER + LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER);
+                
+                // Add to the total length
+                totalLength += length;
+                
+                lsn = readHead.getLong();
+                
+                // Move to the next record, we processed 16 bytes already
+                readHead.position( readHead.position() + length - 16 );
+                
+                if ( lsn >= uptoLSN )
+                {
+                    done = true;
+                    break;
+                }
+                    
+            }
+            
+            // If there is something to flush, then do it now
+            if ( totalLength > 0 )
+            {
+                int offset;                 
+                offset = logBuffer.readHeadPosition;
+                
+                currentLogFile.append( logBuffer.buffer, offset, totalLength );            
+                       
+                //move the position to the next record
+                logBuffer.readHeadPosition = readHead.position();
+            }
+            
+            if ( !done )
+            {
+                // this means we need to rewind and keep flushing
+                logBuffer.readHeadPosition = 0;
+                readHead.rewind();
+                logBuffer.readHeadRewindCount.incrementAndGet();
+            }
+        }
+        
+        return lsn;
+
+    }
+    
+    private void writeHeader ( ByteBuffer buffer, int length, long lsn )
+    {
+        buffer.putInt( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER );
+        buffer.putInt( length );
+        buffer.putLong( lsn );
+        buffer.putLong( length ^ lsn ); 
+    }
+    
+    private void writeFooter ( ByteBuffer buffer, int checksum )
+    {
+        buffer.putInt( checksum );
+        buffer.putInt( LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER );
+    }
+    
+    
+    /**
+     * Used to group the memory buffer data together 
+     */
+    private static class LogBuffer
+    {        
+        /** In memory buffer */
+        byte buffer[];
+                
+        /** Used to scan the buffer while reading it to flush */
+        ByteBuffer readHead;
+        
+        /** Advanced as readHead flushes data */
+        int readHeadPosition;
+        
+        /** Rewind count of readHead..used to avoid overwriting nonflushed data */
+        AtomicInteger readHeadRewindCount;
+        
+        /** Used to scan the buffer while appending records into it */
+        ByteBuffer writeHead;
+        
+        /** Rewind count of writeHead..used to avoid overwriting nonflushed data */
+        int writeHeadRewindCount;
+        
+        /** Used to mark records that should be skipped at the end of the log buffer */
+        final static int SKIP_RECORD_LENGTH = -1;
+        
+        /** Header footer buffer used when writing user buffers directly */
+        byte headerFooterBuffer[];
+        
+        /** Used to format header footer buffer */
+        ByteBuffer headerFooterHead;
+        
+        public LogBuffer( int bufferSize, LogFileManager.LogFileWriter currentLogFile )
+        {
+            buffer = new byte[bufferSize];
+            readHead = ByteBuffer.wrap( buffer );
+            
+            readHeadRewindCount = new AtomicInteger( 0 );
+            
+            writeHead = ByteBuffer.wrap( buffer );
+            
+            headerFooterBuffer = new byte[LogFileRecords.MAX_MARKER_SIZE];
+            headerFooterHead = ByteBuffer.wrap( headerFooterBuffer );
+        }
+    }
+    
+    /**
+     * Used to group the flush related data together
+     */
+    private static class FlushStatus
+    {
+        /** whether flush is going on */
+        boolean flushInProgress;
+        
+        /** Current flush request */
+        long uptoLSN;
+        
+        
+        /** Current flushed lsn */
+        long flushedLSN;
+        
+        
+        /** Keeps track of the number of waiters */
+        int numWaiters;
+    }
+}

Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java?rev=1177627&view=auto
==============================================================================
--- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java (added)
+++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java Fri Sep 30 12:28:59 2011
@@ -0,0 +1,494 @@
+
+package org.apache.directory.server.log.impl;
+
+import java.nio.ByteBuffer;
+
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Lock;
+
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.EOFException;
+
+import org.apache.directory.server.i18n.I18n;
+import org.apache.directory.server.log.InvalidLogException;
+import org.apache.directory.server.log.LogAnchor;
+import org.apache.directory.server.log.LogAnchorComparator;
+import org.apache.directory.server.log.LogScanner;
+import org.apache.directory.server.log.UserLogRecord;
+
+class LogManager
+{
+   
+    /**  Controlfile record size */
+    private final static int CONTROLFILE_RECORD_SIZE = 36;
+    
+    /** Controlfile file magic number */
+    private final static int CONTROLFILE_MAGIC_NUMBER = 0xFF11FF11;
+    
+    /** Controlfile log file number */
+    private final static long CONTROLFILE_LOG_FILE_NUMBER = -1;
+    
+    /** Shadow Controlfile log file number */
+    private final static long CONTROLFILE_SHADOW_LOG_FILE_NUMBER = -2;
+    
+    /** buffer used to do IO on controlfile */
+    byte controlFileBuffer[] = new byte[CONTROLFILE_RECORD_SIZE];
+    
+    /** ByteBuffer used to to IO on checkpoint file */
+    ByteBuffer controlFileMarker = ByteBuffer.wrap( controlFileBuffer );
+    
+    /** Current checkpoint record in memory */
+    ControlFileRecord controlFileRecord = new ControlFileRecord();
+    
+    /** Min neeeded point in the log */
+    LogAnchor minLogAnchor = new LogAnchor();
+    
+    /** Protects minLogAchor */
+    Lock minLogAnchorLock = new ReentrantLock();
+    
+    /** Log file manager */
+    LogFileManager logFileManager;
+        
+    /** Log Anchor comparator */
+    LogAnchorComparator anchorComparator = new LogAnchorComparator();
+    
+    /** Current log file */
+    private long currentLogFileNumber;
+    
+    /** Buffer used to read log file markers */
+    byte markerBuffer[] = new byte[LogFileRecords.LOG_FILE_HEADER_SIZE];
+    
+    /** ByteBuffer wrapper for the marker buffer */
+    ByteBuffer markerHead = ByteBuffer.wrap( markerBuffer );
+    
+    
+    public LogManager( LogFileManager logFileManager )
+    {
+        this.logFileManager = logFileManager;
+    }
+    
+    /**
+     *Initializes the log management:
+     * 1) Checks if control file exists and creates it if necesssary. If it exists, it reads it and loads the latest checkpoint.
+     * 2) Starts from the lates checkpoint ans scans forwards the logs to check for corrupted logs and determine the end of the log.
+     * This scan ends either when a properly ended log file is found or a partially written log record is found. 
+     *
+     * @throws IOException
+     * @throws InvalidLogException
+     */
+    public void initLogManager() throws IOException, InvalidLogException
+    {
+        LogAnchor scanPoint = new LogAnchor();
+        LogScanner scanner;
+        UserLogRecord logRecord;        
+        LogFileManager.LogFileReader reader;
+
+        
+        
+        // Read and verify control file
+        boolean controlFileExists = true;
+        try
+        {
+            this.readControlFile();
+        }
+        catch( FileNotFoundException e )
+        {
+            controlFileExists = false;
+        }
+        
+        if ( controlFileExists )
+        {
+            boolean invalidLog = false;
+            
+            // Set the min log anchor from the control file
+            minLogAnchor.resetLogAnchor( controlFileRecord.minNeededLogFile, 
+                    controlFileRecord.minNeededLogFileOffset, controlFileRecord.minNeededLSN );
+            
+            scanPoint.resetLogAnchor( minLogAnchor );
+            
+            logRecord = new UserLogRecord();
+            scanner = new DefaultLogScanner( scanPoint, logFileManager );
+            
+            try
+            {
+                while ( scanner.getNextRecord( logRecord ) )
+                {
+                    // No need to do anything with the log record
+                }
+            }
+            catch( InvalidLogException e )
+            {
+                invalidLog = true;
+            }
+            finally
+            {
+                scanner.close();
+            }
+            
+            long lastGoodLogFileNumber = scanner.getLastGoodFileNumber();
+            long lastGoodLogFileOffset = scanner.getLastGoodOffset();
+            currentLogFileNumber = lastGoodLogFileNumber;
+            
+            if ( ( lastGoodLogFileNumber < LogAnchor.MIN_LOG_NUMBER ) || 
+                ( lastGoodLogFileOffset < LogAnchor.MIN_LOG_OFFSET ))
+            {
+                throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+            }
+            
+            scanPoint.resetLogAnchor( lastGoodLogFileNumber, lastGoodLogFileOffset, 
+                    LogAnchor.UNKNOWN_LSN );
+            
+            if ( anchorComparator.compare( scanPoint, minLogAnchor ) < 0 )
+            {
+                throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+            }
+            
+            /*
+             * If invalid content at the end of file:
+             * if we are past the header of file, then
+             * truncate the file to the end of the last
+             * read log record, otherwise we read a partially
+             * written log file header, in this case reformat the log file.
+             * Also check next for the existence of next file to make
+             * sure we really read the last log file.
+             */
+            if ( invalidLog )
+            {
+                // Check if next log file exists
+                reader = null;
+                try
+                {
+                    reader = logFileManager.getReaderForLogFile( ( lastGoodLogFileNumber + 1 ) );
+                    
+                }
+                catch ( FileNotFoundException e )
+                {
+                    // Fine, this is what we want
+                }
+                finally
+                {
+                    if ( reader != null )
+                    {
+                        reader.close();
+                    }
+                }
+                
+                if ( reader != null )
+                {
+                    throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+                }
+                
+                if  ( lastGoodLogFileOffset >= LogFileRecords.LOG_FILE_HEADER_SIZE  )
+                {
+                    logFileManager.truncateLogFile( lastGoodLogFileNumber, lastGoodLogFileOffset );
+                }
+                else
+                {
+                    // Reformat the existing log file
+                    this.createNextLogFile( true);
+                }           
+            }
+            
+        }
+        {
+            /*
+             * Control file does not exist. Either we are at the very beginning or 
+             * maybe we crashed in the middle of creating the first log file. 
+             * We  should have the min log file at most with the file header formatted. 
+             */
+           reader = null;
+           boolean fileExists = false;
+           currentLogFileNumber = LogAnchor.MIN_LOG_NUMBER;
+           try
+           {
+               reader = logFileManager.getReaderForLogFile( LogAnchor.MIN_LOG_NUMBER );
+               
+               if ( reader.getLength() > LogFileRecords.LOG_FILE_HEADER_SIZE )
+               {
+                   throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+               }
+               fileExists = true;
+           }
+           catch ( FileNotFoundException e )
+           {
+               // Fine, we will create the file
+           }
+           finally
+           {
+               if ( reader != null )
+               {
+                   reader.close();
+               }
+           }
+            
+           
+           
+           this.createNextLogFile( fileExists );
+            
+            // Prepare the min log anchor and control file and write the control file
+           minLogAnchor.resetLogAnchor( LogAnchor.MIN_LOG_NUMBER, LogFileRecords.LOG_FILE_HEADER_SIZE, LogAnchor.UNKNOWN_LSN );
+           
+           this.writeControlFile();
+        }
+    }
+    
+    /**
+     * Called by LogFlushManager to switch to the next file.
+     *
+     * Note:Currently we do a checkpoint and delete unnecessary log files when we switch to a new file. Some
+     * of this tasks can be delegated to a background thread later. 
+     *
+     * @param currentWriter current log file used by the flush manager. Null if the flush manager is just starting up.
+     * @return new lgo file to be used.
+     * @throws IOException
+     * @throws InvalidLogException
+     */
+    public LogFileManager.LogFileWriter switchToNextLogFile( LogFileManager.LogFileWriter currentWriter ) throws IOException, InvalidLogException
+    {
+        if ( currentWriter != null )
+        {
+            currentWriter.close();
+            this.writeControlFile();
+            this.createNextLogFile( false );
+        }
+        
+        return logFileManager.getWriterForLogFile( this.currentLogFileNumber );
+    }
+    
+    /**
+     * Called when the logging subsystem is notified about the minimum position 
+     * in the log files that is needed. Log manager uses this information to advance
+     * its checkpoint and delete unnecessary log files.
+     *
+     * @param newLogAnchor min needed log anchor
+     */
+    public void advanceMinLogAnchor( LogAnchor newLogAnchor )
+    {
+        if ( newLogAnchor == null )
+        {
+            return;
+        }
+        
+        minLogAnchorLock.lock();
+        
+        if ( anchorComparator.compare( minLogAnchor, newLogAnchor ) < 0 )
+        {
+            minLogAnchor.resetLogAnchor( newLogAnchor );
+        }
+        
+        minLogAnchorLock.unlock();
+    }
+    
+    /**
+     * Writes the control file. To make paritally written control files unlikely,
+     * data is first written to a shadow file and then moved(renamed) to the controlfile.
+     * Move of a file is atomic in POSIX systems, in GFS like file systems(in HDFS for example).
+     * On windows, it is not always atomic but atomic versions of rename operations started to
+     * appear in their recent file systems. 
+     *
+     * @throws IOException
+     */
+    private void writeControlFile() throws IOException
+    {
+        // Copy the min log file anchor
+        minLogAnchorLock.lock();
+        
+        controlFileRecord.minNeededLogFile = minLogAnchor.getLogFileNumber();
+        controlFileRecord.minNeededLogFileOffset = minLogAnchor.getLogFileOffset();
+        controlFileRecord.minNeededLSN = minLogAnchor.getLogLSN();
+        
+        minLogAnchorLock.unlock();
+        
+        if ( controlFileRecord.minNeededLogFile > controlFileRecord.minExistingLogFile  )
+        {
+            this.deleteUnnecessaryLogFiles( controlFileRecord.minExistingLogFile,controlFileRecord.minNeededLogFile );
+            controlFileRecord.minExistingLogFile = controlFileRecord.minNeededLogFile;
+            
+        }
+        
+        // TODO compute checksum for log record here
+        
+        
+        controlFileMarker.rewind();
+        controlFileMarker.putLong( controlFileRecord.minExistingLogFile );
+        controlFileMarker.putLong( controlFileRecord.minNeededLogFile );
+        controlFileMarker.putLong( controlFileRecord.minNeededLogFileOffset );
+        controlFileMarker.putLong( controlFileRecord.minNeededLSN );
+        controlFileMarker.putLong( controlFileRecord.checksum );
+        controlFileMarker.putInt( CONTROLFILE_MAGIC_NUMBER );
+        
+        
+        boolean shadowFileExists = logFileManager.createLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER  );
+        
+        if ( shadowFileExists )
+        {
+            logFileManager.truncateLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER, 0 );
+        }
+        
+        LogFileManager.LogFileWriter controlFileWriter = logFileManager.getWriterForLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER );
+        
+        try
+        {
+            controlFileWriter.append( controlFileBuffer, 0, CONTROLFILE_RECORD_SIZE);
+            controlFileWriter.sync();
+        }
+        finally
+        {
+            controlFileWriter.close();
+        }
+        
+        // Do the move now
+        logFileManager.rename( CONTROLFILE_SHADOW_LOG_FILE_NUMBER , CONTROLFILE_LOG_FILE_NUMBER );
+        
+        
+    }
+    
+    /**
+     * Read and verifies the control file.
+     *
+     * @throws IOException
+     * @throws InvalidLogException
+     * @throws FileNotFoundException
+     */
+    private void readControlFile() throws IOException, InvalidLogException, FileNotFoundException
+    {
+        boolean invalidControlFile = false;
+        LogFileManager.LogFileReader controlFileReader = logFileManager.getReaderForLogFile( CONTROLFILE_LOG_FILE_NUMBER );
+        
+        try
+        {
+            controlFileReader.read( controlFileBuffer, 0, CONTROLFILE_RECORD_SIZE );
+        }
+        catch( EOFException e )
+        {
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) , e);
+        }
+        finally
+        {
+            controlFileReader.close();
+        }
+        
+        controlFileMarker.rewind();
+        controlFileRecord.minExistingLogFile = controlFileMarker.getLong();
+        controlFileRecord.minNeededLogFile = controlFileMarker.getLong();
+        controlFileRecord.minNeededLogFileOffset = controlFileMarker.getLong();
+        controlFileRecord.minNeededLSN = controlFileMarker.getLong();
+        controlFileRecord.checksum = controlFileMarker.getLong();
+        int magicNumber = controlFileMarker.getInt();
+        
+        
+        if ( controlFileRecord.minExistingLogFile < LogAnchor.MIN_LOG_NUMBER )
+        {
+            invalidControlFile = true;
+        }
+        
+        if ( (controlFileRecord.minNeededLogFile < LogAnchor.MIN_LOG_NUMBER ) ||
+              ( controlFileRecord.minNeededLogFileOffset < LogAnchor.MIN_LOG_OFFSET ) )
+        {
+            invalidControlFile = true;
+        }
+        
+        if ( controlFileRecord.minExistingLogFile > controlFileRecord.minNeededLogFile )
+        {
+            invalidControlFile = true;
+        }
+        
+        if ( magicNumber != this.CONTROLFILE_MAGIC_NUMBER )
+        {
+            invalidControlFile = true;
+        }
+        
+        // TODO compute and compare checksum
+        
+        if ( invalidControlFile == true )
+        {
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+        }
+        
+    }
+    
+    /**
+     * Creates the next log file. If the log file already exists, then it is reformatted, that is,
+     * its size is truncated to zero and file header is writtten again.
+     *
+     * @param reformatExistingFile log file already exists and should be formatted. If false, log file should not exist.
+     * @throws IOException
+     * @throws InvalidLogException
+     */
+    private void createNextLogFile( boolean reformatExistingFile ) throws IOException, InvalidLogException
+    {
+        LogFileManager.LogFileWriter writer = null;
+            
+        long logFileNumber = this.currentLogFileNumber;
+        
+        if ( reformatExistingFile == false )
+        {
+            logFileNumber++;
+        }
+        
+        // Try to create the file.
+        boolean fileAlreadyExists = logFileManager.createLogFile( logFileNumber );
+        
+        if ( ( reformatExistingFile == false ) && ( fileAlreadyExists == true ) )
+        {
+            // Didnt expect the file to be around
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+        }
+        
+        if ( ( reformatExistingFile == true ) && ( fileAlreadyExists == false ) )
+        {
+            // Didnt expect the file to be around
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+        }
+        
+        if ( reformatExistingFile )
+        {
+            logFileManager.truncateLogFile( logFileNumber, LogAnchor.MIN_LOG_OFFSET );
+           
+        }
+        
+        writer = logFileManager.getWriterForLogFile( logFileNumber );
+        
+        try
+        {
+            markerHead.rewind();
+            markerHead.putLong( logFileNumber );
+            markerHead.putInt( LogFileRecords.LOG_FILE_HEADER_MAGIC_NUMBER );
+            writer.append( markerBuffer, 0, LogFileRecords.LOG_FILE_HEADER_SIZE );
+            writer.sync();            
+        }
+        finally
+        {
+            writer.close();
+        }
+        
+        this.currentLogFileNumber = logFileNumber;
+        
+    }
+    
+    private void deleteUnnecessaryLogFiles( long startingLogFileNumber, long endingLogFileNumber )
+    {
+        for ( long logFileNumber = startingLogFileNumber; logFileNumber < endingLogFileNumber; 
+                logFileNumber++ )
+        {
+            // Do a best effort delete
+            logFileManager.deleteLogFile( logFileNumber );
+        }
+    }
+    
+    /**
+     * Checkpoint record
+     */
+     private class ControlFileRecord
+     {
+         long minExistingLogFile;
+         long minNeededLogFile;
+         long minNeededLogFileOffset;
+         long minNeededLSN;
+         long checksum;
+     }
+     
+    
+      
+}



Re: svn commit: r1177627 - in /directory/apacheds/trunk: i18n/src/main/java/org/apache/directory/server/i18n/ i18n/src/main/resources/org/apache/directory/server/i18n/ xdbm-partition/src/main/java/org/apache/directory/server/log/ xdbm-partition/src/main/ja...

Posted by Felix Knecht <fe...@apache.org>.
Hi Selcuk

Please remember to set also a license header when adding new files to 
the repository. For file below done in 
http://svn.apache.org/viewvc?rev=1177780&view=rev

Regards
Felix

On 09/30/2011 02:29 PM, saya@apache.org wrote:
> Author: saya
> Date: Fri Sep 30 12:28:59 2011
> New Revision: 1177627
>
> URL: http://svn.apache.org/viewvc?rev=1177627&view=rev
> Log:
> First set of changes towards a transactional system over parititions.
>
> This is the first set of changes for the logging subsystem.:
> Log.java: interface to the logging subsystem. Logging subsystem logs uninterpreted blobs of user data.
>
> LogManager.java: Manages deletion, creation and formatting of log files and reading/writing of control file. ControlFile is a special file where critical information for the logging subsystem is held. It includes checkpoint information. Control file is updated by writing to a shadow file first and then renaming(moving) the shadow file to the controlfile.
>
> LogFlushManager.java: Manager logging of data to log files. Manager an internal circular buffer where user data is appended first and then flushed to the underlying log file either when the buffer is full or when one of the clients wants to sync the data. It is possible to specify a zero for the buffer size and directly append the data to the underlying file.Log flushing is done by the foreground threads(No special background thread is used to do it).
>
> LogScanner.java, DefaultLogScanner.java: Implements forwards scanning of log files.
>
> LogFileManager.java: defines an interface that exposes basic file operations on log files. It is possible to use different implementations to satisfy this interface. Currently, randomaccessfile is used but FileChannel or HDFS files can be used as well.
>
> UserLogRecord.java: used to pass byte[] user log records and log position information between the clients and the logging subsystem.
>
> LogFile format: Log file numbers start from 0 and keep increasing as new log files are created. A log file has a log file header which should be verified when the log file is opened for reading. User log records are uninterpreted blobs of data. A recordHeader is prepended and a record footer is appended to the user log record. This record header and footer has also verification information to detect corrupt/partiallly written log records. Record header/footer and file headers are used by the LogScanner implementation to scan the logs. This format makes it possible to log any data in the logging subsystem.
>
>
>
> TODO: test, handle IO exceptions and other exceptions better.
>
> Added:
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java
>      directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java
> Modified:
>      directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java
>      directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties
>
> Modified: directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java?rev=1177627&r1=1177626&r2=1177627&view=diff
> ==============================================================================
> --- directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java (original)
> +++ directory/apacheds/trunk/i18n/src/main/java/org/apache/directory/server/i18n/I18n.java Fri Sep 30 12:28:59 2011
> @@ -778,7 +778,12 @@ public enum I18n
>       ERR_742_CANNOT_ENCODE_ENC_TICKET_PART("ERR_742_CANNOT_ENCODE_ENC_TICKET_PART"),
>       ERR_743_CANNOT_ENCODE_TYPED_DATA("ERR_743_CANNOT_ENCODE_TYPED_DATA"),
>       ERR_744_NULL_PDU_LENGTH("ERR_744_NULL_PDU_LENGTH"),
> -    ERR_745_NOT_A_KERBEROS_STRING("ERR_745_NOT_A_KERBEROS_STRING");
> +    ERR_745_NOT_A_KERBEROS_STRING("ERR_745_NOT_A_KERBEROS_STRING"),
> +    ERR_746("ERR_746"),
> +    ERR_747("ERR_747"),
> +    ERR_748("ERR_748"),
> +    ERR_749("ERR_749"),
> +    ERR_750("ERR_750");
>
>       private static ResourceBundle errBundle = ResourceBundle
>                   .getBundle( "org.apache.directory.server.i18n.errors" );
>
> Modified: directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties?rev=1177627&r1=1177626&r2=1177627&view=diff
> ==============================================================================
> --- directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties (original)
> +++ directory/apacheds/trunk/i18n/src/main/resources/org/apache/directory/server/i18n/errors.properties Fri Sep 30 12:28:59 2011
> @@ -766,4 +766,9 @@ ERR_741_CANNOT_ENCODE_KRB_CRED=Cannot en
>   ERR_742_CANNOT_ENCODE_ENC_TICKET_PART=Cannot encode the EncTicketPart object, the PDU size is {0} when only {1} bytes has been allocated
>   ERR_743_CANNOT_ENCODE_TYPED_DATA=Cannot encode the TypedData object, the PDU size is {0} when only {1} bytes has been allocated
>   ERR_744_NULL_PDU_LENGTH=The PDU length is null, this is not allowed
> -ERR_745_NOT_A_KERBEROS_STRING=The value {0} is not a valid KerberosString
> \ No newline at end of file
> +ERR_745_NOT_A_KERBEROS_STRING=The value {0} is not a valid KerberosString
> +ERR_746=Not a valid log file number  {0}
> +ERR_747=Not a valid log file offset  {0}
> +ERR_748=Invalid log file bufferSize/ max size is sepcified bufferSize {0} logFileSize {0}
> +ERR_749=Log Scanner is already closed
> +ERR_750=Log content is invalid
> \ No newline at end of file
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/InvalidLogException.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,28 @@
> +
> +package org.apache.directory.server.log;
> +
> +/**
> + * An exception used when the log content could be invalid.
> + *
> + * @author<a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
> + */
> +public class InvalidLogException extends Exception
> +{
> +    public InvalidLogException() {}
> +
> +    public InvalidLogException(String s)
> +    {
> +        super(s);
> +    }
> +
> +    public InvalidLogException(Throwable cause)
> +    {
> +        super(cause);
> +    }
> +
> +    public InvalidLogException(String s, Throwable cause)
> +    {
> +        super(s, cause);
> +    }
> +
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,40 @@
> +
> +package org.apache.directory.server.log;
> +
> +import java.io.IOException;
> +
> +public interface Log
> +{
> +
> +    /**
> +     * Initializes the logging subsystem
> +     *
> +     * @param logFilepath log file path
> +     * @param suffix suffix for log file.
> +     * @param logBufferSize size of buffer that will hold unflushed log changes. Specifigy zero if no buffering is desired
> +     * @param logFileSize A soft limit on the log file size
> +     */
> +   public void init( String logFilepath, String suffix, int logBufferSize, long logFileSize );
> +
> +    /**
> +     * Logs the given user record to the log. Position in the log files where the record is logged is returned as part of
> +     * userRecord.
> +     *
> +     * @param userLogRecord provides the user data to be logged
> +     * @param sync if true, this calls returns after making sure that the appended data is reflected to the underlying media
> +     * @throws IOException
> +     * @throws InvalidLogException
> +     */
> +    public void log( UserLogRecord userRecord, boolean sync ) throws IOException, InvalidLogException;
> +
> +
> +    /**
> +     * Starts a san in the logs starting from the given log position
> +     *
> +     * @param startPoint starting position of the scan.
> +     * @return
> +     */
> +    public LogScanner beginScan( LogAnchor startPoint );
> +
> +
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchor.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,83 @@
> +
> +package org.apache.directory.server.log;
> +
> +import org.apache.directory.server.i18n.I18n;
> +
> +/**
> + * Implements a pointer in to the log files
> + * @author<a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
> + */
> +public class LogAnchor
> +{
> +    // TODO move this to logger
> +    /** Invalid/unknown lsn. Log LSN starts at UNKNOWN_LSN + 1 and is ever increasing */
> +    public final static long UNKNOWN_LSN = Long.MIN_VALUE;
> +
> +    /** Min log file number */
> +    public final static long MIN_LOG_NUMBER = 0;
> +
> +    /** Min log file offset */
> +    public final static long MIN_LOG_OFFSET = 0;
> +
> +
> +    /** log file identifier of the anchor */
> +    private long logFileNumber = 0 ;
> +
> +    /** Offset into the log file identified by logfilenumber */
> +    private long logFileOffset = 0;
> +
> +    /** LSN corresponding to the logFileNumber and fileOffset */
> +    private long logLSN = UNKNOWN_LSN;
> +
> +    public LogAnchor()
> +    {
> +
> +    }
> +
> +    public LogAnchor( long logFileNumber, long logFileOffset, long logLSN )
> +    {
> +        this.resetLogAnchor( logFileNumber, logFileOffset, logLSN );
> +    }
> +
> +
> +    public void resetLogAnchor( long logFileNumber, long logFileOffset, long logLSN )
> +    {
> +        if ( logFileNumber<  0 )
> +        {
> +            throw new IllegalArgumentException( I18n.err( I18n.ERR_746, logFileNumber ) );
> +        }
> +
> +        if ( logFileOffset<  0 )
> +        {
> +            throw new IllegalArgumentException( I18n.err( I18n.ERR_747, logFileOffset ) );
> +        }
> +
> +
> +        this.logFileNumber = logFileNumber;
> +        this.logFileOffset = logFileOffset;
> +        this.logLSN = logLSN;
> +    }
> +
> +    public void resetLogAnchor( LogAnchor logAnchor )
> +    {
> +        this.resetLogAnchor( logAnchor.getLogFileNumber(), logAnchor.getLogFileOffset(), logAnchor.getLogLSN() );
> +    }
> +
> +
> +    public long getLogFileNumber()
> +    {
> +        return this.logFileNumber;
> +    }
> +
> +
> +    public long getLogFileOffset()
> +    {
> +        return this.logFileOffset;
> +    }
> +
> +
> +    public long getLogLSN()
> +    {
> +        return this.logLSN;
> +    }
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogAnchorComparator.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,57 @@
> +
> +package org.apache.directory.server.log;
> +
> +import java.util.Comparator;
> +
> +import org.apache.directory.server.i18n.I18n;
> +
> +public class LogAnchorComparator implements Comparator<LogAnchor>
> +{
> +    /**
> +     * Compare two log anchors.
> +     *
> +     * @param obj1 First object
> +     * @param obj2 Second object
> +     * @return a positive integer if obj1>  obj2, 0 if obj1 == obj2,
> +     *         and a negative integer if obj1<  obj2
> +     */
> +     public int compare( LogAnchor obj1, LogAnchor obj2 )
> +     {
> +        if ( obj1 == null ) {
> +            throw new IllegalArgumentException( I18n.err( I18n.ERR_525 ) );
> +        }
> +
> +        if ( obj2 == null ) {
> +            throw new IllegalArgumentException( I18n.err( I18n.ERR_526 ) );
> +        }
> +
> +        long logFileNumber1 = obj1.getLogFileNumber();
> +        long logFileOffset1 = obj1.getLogFileOffset();
> +        long logFileNumber2 = obj2.getLogFileNumber();
> +        long logFileOffset2 = obj2.getLogFileOffset();
> +
> +        if ( logFileNumber1>  logFileNumber2 )
> +        {
> +            return 1;
> +        }
> +        else if ( logFileNumber1 == logFileNumber2 )
> +        {
> +            if ( logFileOffset1>  logFileOffset2 )
> +            {
> +                return 1;
> +            }
> +            else if ( logFileOffset1 == logFileOffset2 )
> +            {
> +                return 0;
> +            }
> +            else
> +            {
> +                return -1;
> +            }
> +        }
> +        else
> +        {
> +            return -1;
> +        }
> +     }
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/LogScanner.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,42 @@
> +
> +package org.apache.directory.server.log;
> +
> +
> +import java.io.IOException;
> +
> +public interface LogScanner
> +{
> +    /**
> +     * Reads and returns the next user record from the log into a backing byte array
> +     * and returns a reference to it. Returned array can be overwritten
> +     * after the next call to getNextRecord()
> +     *
> +     * @param  log record to be filled in by
> +     * @return true if there is a next record
> +     * throws IOException
> +     * throws InvalidLogException thrown if the log content is invalid
> +     */
> +    public boolean getNextRecord(UserLogRecord logRecord) throws IOException, InvalidLogException;
> +
> +
> +    /**
> +     * Returns the last successfully read log file number
> +     *
> +     * @return last successfully read log file number
> +     */
> +    public long getLastGoodFileNumber();
> +
> +    /**
> +     * Returns the last successfully read log file number
> +     *
> +     * @return last successfully read log file number
> +     */
> +    public long getLastGoodOffset();
> +
> +    /**
> +     * Closes the scanner and releases any
> +     * resources.
> +     *
> +     */
> +    public void close();
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,47 @@
> +
> +package org.apache.directory.server.log;
> +
> +/**
> + * A user log record that can be used to pass user record between the clients and the logger
> + *
> + * @author<a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
> + */
> +public class UserLogRecord
> +{
> +    private final static int INITIAL_SIZE =  1024;
> +
> +    /** array used to hold user log records */
> +    private byte[] recordHolder;
> +
> +    /** offset int the  byte array where user record starts */
> +    int offset;
> +
> +    /** length of the user record in the byte array */
> +    int length;
> +
> +    /** Position of the log record in the log */
> +    private LogAnchor logAnchor = new LogAnchor();
> +
> +    public void setData( byte[] data, int length )
> +    {
> +        this.recordHolder = recordHolder;
> +    }
> +
> +    public byte[] getDataBuffer()
> +    {
> +        return recordHolder;
> +    }
> +
> +
> +    public int getDataLength()
> +    {
> +        return length;
> +    }
> +
> +
> +    public LogAnchor getLogAnchor()
> +    {
> +        return logAnchor;
> +    }
> +
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,243 @@
> +
> +package org.apache.directory.server.log.impl;
> +
> +import java.io.EOFException;
> +import java.io.File;
> +import java.io.IOException;
> +import java.io.FileNotFoundException;
> +
> +import java.io.RandomAccessFile;
> +
> +import org.apache.directory.server.i18n.I18n;
> +import org.apache.directory.server.log.impl.LogFileManager.LogFileReader;
> +import org.apache.directory.server.log.impl.LogFileManager.LogFileWriter;
> +
> +class DefaultLogFileManager implements LogFileManager
> +{
> +    private String logFilePath;
> +    private String suffix;
> +
> +    /**
> +     * Inits the log file manager to use the given logfile path and the suffix. Each log file
> +     * has name logFileName_<logFileNumber>.suffix
> +     *
> +     * @param logFilepath log file path
> +     * @param suffix suffix for log file.
> +     */
> +    public void init( String logFilepath, String suffix )
> +    {
> +        this.logFilePath = logFilePath;
> +        this.suffix = suffix;
> +    }
> +
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public LogFileReader getReaderForLogFile( long logFileNumber ) throws IOException, FileNotFoundException
> +    {
> +        File logFile = this.makeLogFileName( logFileNumber );
> +
> +        // This will throw a file not found exception if file does not exist
> +        RandomAccessFile raf = new RandomAccessFile( logFile, "r" );
> +
> +        return new LogFileReader( raf, logFileNumber );
> +    }
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public LogFileWriter getWriterForLogFile( long logFileNumber ) throws IOException, FileNotFoundException
> +    {
> +        File logFile = this.makeLogFileName( logFileNumber );
> +
> +        // This will throw a file not found exception if file does not exist
> +        RandomAccessFile raf = new RandomAccessFile( logFile, "rw" );
> +
> +        return new LogFileWriter( raf, logFileNumber );
> +    }
> +
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public boolean createLogFile( long logFileNumber ) throws IOException
> +    {
> +        File logFile = this.makeLogFileName( logFileNumber );
> +
> +        boolean fileAlreadyExists = !logFile.createNewFile();
> +
> +        return fileAlreadyExists;
> +    }
> +
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public void truncateLogFile( long logFileNumber, long size ) throws IOException, FileNotFoundException
> +    {
> +        if ( size<  0 )
> +        {
> +            throw new IllegalArgumentException( "Invalid file size is specified for the log file: " + logFileNumber + " " + size );
> +        }
> +
> +        File logFile = this.makeLogFileName( logFileNumber );
> +
> +        // This will throw a file not found exception if file does not exist
> +        RandomAccessFile raf = new RandomAccessFile( logFile, "rw" );
> +
> +        raf.setLength( size );
> +        raf.getFD().sync();
> +    }
> +
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public void deleteLogFile( long logFileNumber )
> +    {
> +        File logFile = this.makeLogFileName( logFileNumber );
> +
> +        logFile.delete();
> +    }
> +
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public boolean rename(long originalLogFileNumber, long newLongFileNumber)
> +    {
> +        File oldLogFile = this.makeLogFileName( originalLogFileNumber );
> +        boolean result = oldLogFile.renameTo( this.makeLogFileName( newLongFileNumber ) );
> +        return result;
> +    }
> +
> +
> +    private File makeLogFileName( long logFileNumber )
> +    {
> +
> +        return new File( logFilePath + "/" + LogFileManager.LOG_NAME_PREFIX + logFileNumber + "." + suffix );
> +    }
> +
> +    static class LogFileReader implements LogFileManager.LogFileReader
> +    {
> +        /** Underlying log file */
> +        RandomAccessFile raf;
> +
> +        /** Log file identifier */
> +        long logFileNumber;
> +
> +
> +        public LogFileReader( RandomAccessFile raf, long logFileNumber )
> +        {
> +            this.raf = raf;
> +            this.logFileNumber = logFileNumber;
> +        }
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public int read( byte[] buffer, int offset, int length ) throws IOException, EOFException
> +        {
> +            raf.readFully( buffer, offset, length );
> +            return length;
> +        }
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public void seek( long position ) throws IOException
> +        {
> +            raf.seek( position );
> +        }
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public void close() throws IOException
> +        {
> +            raf.close();
> +        }
> +
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public long logFileNumber()
> +        {
> +            return logFileNumber;
> +        }
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public long getLength() throws IOException
> +        {
> +            return raf.length();
> +        }
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public long getOffset() throws IOException
> +        {
> +            return raf.getFilePointer();
> +        }
> +    }
> +
> +
> +    static class LogFileWriter implements LogFileManager.LogFileWriter
> +    {
> +        /** Underlying log file */
> +        RandomAccessFile raf;
> +
> +        /** Log file identifier */
> +        long logFileNumber;
> +
> +
> +        public LogFileWriter( RandomAccessFile raf, long logFileNumber )
> +        {
> +            this.raf = raf;
> +            this.logFileNumber = logFileNumber;
> +        }
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public void append( byte[] buffer, int offset, int length ) throws IOException
> +        {
> +            raf.write( buffer, offset, length );
> +        }
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public void sync() throws IOException
> +        {
> +             raf.getFD().sync();
> +        }
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public void close() throws IOException
> +        {
> +            raf.close();
> +        }
> +
> +         /**
> +          * {@inheritDoc}
> +          */
> +        public long logFileNumber()
> +        {
> +            return logFileNumber;
> +        }
> +
> +        /**
> +         * {@inheritDoc}
> +         */
> +        public long getLength() throws IOException
> +        {
> +            return raf.length();
> +        }
> +    }
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,354 @@
> +
> +package org.apache.directory.server.log.impl;
> +
> +import java.io.IOException;
> +import java.io.EOFException;
> +import java.io.FileNotFoundException;
> +import java.nio.ByteBuffer;
> +
> +import org.apache.directory.server.i18n.I18n;
> +import org.apache.directory.server.log.InvalidLogException;
> +import org.apache.directory.server.log.LogAnchor;
> +import org.apache.directory.server.log.LogScanner;
> +import org.apache.directory.server.log.UserLogRecord;
> +
> +public class DefaultLogScanner implements LogScanner
> +{
> +    /** LSN of the last successfully read log record */
> +    private long prevLSN = LogAnchor.UNKNOWN_LSN;
> +
> +    /** File number of the last successfully read position's file number */
> +    private long prevLogFileNumber = -1;
> +
> +    /** File number of the last known good offset */
> +    private long prevLogFileOffset = -1;
> +
> +    /** Position to read the next record from */
> +    private LogAnchor startingLogAnchor = new LogAnchor();
> +
> +    /** Last Read Lsn */
> +    private long lastReadLSN = LogAnchor.UNKNOWN_LSN;
> +
> +    /** Current log file pointer to read from */
> +    LogFileManager.LogFileReader currentLogFile;
> +
> +    /** True if scanner is closed */
> +    boolean closed = false;
> +
> +    /** True if scanner hit invalid content. No more reads will be done after invalid log content is hit */
> +    boolean invalidLog = false;
> +
> +    /** log file manager used to open files for reading */
> +    LogFileManager logFileManager;
> +
> +    /** Buffer used to read log file markers */
> +    byte markerBuffer[] = new byte[LogFileRecords.MAX_MARKER_SIZE];
> +
> +    /** ByteBuffer wrapper for the marker buffer */
> +    ByteBuffer markerHead = ByteBuffer.wrap( markerBuffer );
> +
> +    public DefaultLogScanner( LogAnchor startingLogAnchor, LogFileManager logFileManger )
> +    {
> +        startingLogAnchor.resetLogAnchor( startingLogAnchor );
> +        this.logFileManager = logFileManager;
> +    }
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public boolean getNextRecord(UserLogRecord logRecord) throws IOException, InvalidLogException
> +    {
> +        boolean startingRead = false;
> +
> +        checkIfClosed();
> +
> +        if ( invalidLog )
> +        {
> +            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +        }
> +
> +        long fileLength;
> +        long fileOffset;
> +
> +
> +        try
> +        {
> +            if ( currentLogFile == null )
> +            {
> +
> +                long startingOffset = startingLogAnchor.getLogFileOffset();
> +
> +                // Read and verify header
> +                currentLogFile = this.readFileHeader( startingLogAnchor.getLogFileNumber() );
> +
> +                if ( currentLogFile == null )
> +                {
> +                    return false; // Nothing to read
> +                }
> +
> +                if ( startingOffset>  0 )
> +                {
> +                    if ( startingOffset<  LogFileRecords.LOG_FILE_HEADER_SIZE )
> +                    {
> +                        // Offset should be at log file marker boundary
> +                        this.markScanInvalid();
> +                    }
> +
> +                    prevLogFileOffset = Math.max( startingOffset, currentLogFile.getLength() );
> +                    currentLogFile.seek( startingOffset );
> +                }
> +                startingRead = true;
> +            }
> +
> +            while ( true )
> +            {
> +                fileLength = currentLogFile.getLength();
> +                fileOffset = currentLogFile.getOffset();
> +
> +                if ( fileOffset>  fileLength )
> +                {
> +                    this.markScanInvalid();
> +                }
> +                else if ( fileOffset == fileLength )
> +                {
> +                    // Switch to next file.. This reads and verifies the header of the new file
> +                    long nextLogFileNumber = currentLogFile.logFileNumber() + 1;
> +                    currentLogFile.close();
> +                    currentLogFile = this.readFileHeader( nextLogFileNumber );
> +
> +                    if ( currentLogFile == null )
> +                        return false; // Done.. End of log stream
> +
> +
> +                }
> +                else
> +                {
> +                    break;  // break to read the user record
> +                }
> +            }
> +
> +            // Read and verify record header
> +            int recordLength = this.readRecordHeader();
> +
> +            // If starting read, then check if we have the expected lsn in case
> +            // expected lsn is known
> +            if ( startingRead )
> +            {
> +                long startingLSN = startingLogAnchor.getLogLSN();
> +
> +                if ( ( startingLSN != LogAnchor.UNKNOWN_LSN )&&  ( startingLSN != lastReadLSN ) )
> +                {
> +                    this.markScanInvalid();
> +                }
> +            }
> +
> +            // Read and verify user block
> +            this.readLogRecord( logRecord, recordLength );
> +
> +            // Read and verify footer
> +            this.readRecordFooter();
> +
> +
> +            // If we are here, then we successfully read the log record.
> +            // Set the read record's position, uptate last read good location
> +            // and then return
> +            fileOffset = currentLogFile.getOffset();
> +
> +            LogAnchor userLogAnchor = logRecord.getLogAnchor();
> +            userLogAnchor.resetLogAnchor( currentLogFile.logFileNumber(), fileOffset - recordLength, lastReadLSN );
> +
> +            prevLogFileOffset = fileOffset;
> +            prevLogFileNumber = currentLogFile.logFileNumber();
> +            prevLSN = lastReadLSN;
> +        }
> +        catch( EOFException e)
> +        {
> +            // This means either the log record or the log file header was
> +            // partially written. Treat this as invalid log content
> +            this.markScanInvalid();
> +        }
> +
> +        return true;
> +
> +    }
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public long getLastGoodFileNumber()
> +    {
> +        return this.prevLogFileNumber;
> +    }
> +
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public long getLastGoodOffset()
> +    {
> +        return this.prevLogFileOffset;
> +    }
> +
> +
> +
> +    /**
> +     * {@inheritDoc}
> +     */
> +    public void close()
> +    {
> +        if ( closed == false )
> +        {
> +            closed = true;
> +            if (currentLogFile != null)
> +            {
> +                try
> +                {
> +                    currentLogFile.close();
> +                    currentLogFile = null;
> +                }
> +                catch( IOException e )
> +                {
> +                    // Ignore
> +                }
> +            }
> +        }
> +
> +    }
> +
> +
> +    private int readRecordHeader() throws IOException, InvalidLogException, EOFException
> +    {
> +        boolean invalid = false;
> +
> +        markerHead.rewind();
> +        currentLogFile.read( markerBuffer, 0, LogFileRecords.RECORD_HEADER_SIZE );
> +        int magicNumber = markerHead.getInt();
> +        int length = markerHead.getInt();
> +        long lsn = markerHead.getLong();
> +        long checksum = markerHead.getLong();
> +
> +        if ( magicNumber != LogFileRecords.RECORD_HEADER_MAGIC_NUMBER )
> +        {
> +            invalid = true;
> +        }
> +
> +        if ( length<= ( LogFileRecords.RECORD_HEADER_SIZE + LogFileRecords.RECORD_FOOTER_SIZE ) )
> +        {
> +            invalid = true;
> +        }
> +
> +        if ( lsn<  prevLSN )
> +        {
> +            invalid = true;
> +        }
> +
> +        if ( checksum != ( lsn ^ length ) )
> +        {
> +            invalid = true;
> +        }
> +
> +        if ( invalid == true )
> +        {
> +            this.markScanInvalid();
> +        }
> +
> +        // Everything went fine
> +        lastReadLSN = lsn;
> +        return length;
> +    }
> +
> +
> +    private void readRecordFooter() throws IOException, InvalidLogException, EOFException
> +    {
> +        boolean invalid = false;
> +
> +        markerHead.rewind();
> +        currentLogFile.read( markerBuffer, 0, LogFileRecords.RECORD_FOOTER_SIZE );
> +        int checksum = markerHead.getInt();
> +        int magicNumber = markerHead.getInt();
> +
> +        if ( magicNumber != LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER )
> +        {
> +            invalid = true;
> +        }
> +
> +        // TODO compute checksum
> +
> +        if ( invalid == true )
> +        {
> +            this.markScanInvalid();
> +        }
> +    }
> +
> +    private void readLogRecord( UserLogRecord userRecord, int length ) throws IOException, EOFException
> +    {
> +        byte dataBuffer[] = userRecord.getDataBuffer();
> +
> +        if ( dataBuffer == null || dataBuffer.length<  length )
> +        {
> +            // Allocate a larger buffer
> +            dataBuffer = new byte[length];
> +        }
> +
> +        currentLogFile.read( dataBuffer, 0, length );
> +        userRecord.setData( dataBuffer, length );
> +    }
> +
> +    private LogFileManager.LogFileReader readFileHeader( long logFileNumber ) throws IOException, InvalidLogException, EOFException
> +    {
> +        boolean invalid = false;
> +        LogFileManager.LogFileReader logFile;
> +
> +        try
> +        {
> +            logFile = logFileManager.getReaderForLogFile( logFileNumber );
> +        }
> +        catch ( FileNotFoundException e )
> +        {
> +            return null; // end of log scan
> +        }
> +
> +        // File exists
> +        this.prevLogFileNumber = logFileNumber;
> +        this.prevLogFileOffset = 0;
> +
> +        markerHead.rewind();
> +        currentLogFile.read( markerBuffer, 0, LogFileRecords.LOG_FILE_HEADER_SIZE );
> +        long persistedLogFileNumber = markerHead.getLong();
> +        int magicNumber = markerHead.getInt();
> +
> +        if ( persistedLogFileNumber != logFileNumber )
> +        {
> +            invalid = true;
> +        }
> +
> +        if ( magicNumber != LogFileRecords.LOG_FILE_HEADER_MAGIC_NUMBER )
> +        {
> +            invalid = true;
> +        }
> +
> +
> +        if ( invalid == true )
> +        {
> +            this.markScanInvalid();
> +        }
> +
> +        // Everything is fine, advance good file offset and return
> +        this.prevLogFileOffset = LogFileRecords.LOG_FILE_HEADER_SIZE;
> +        return logFile;
> +    }
> +
> +    private void checkIfClosed()
> +    {
> +        if ( closed == true )
> +        {
> +            throw new IllegalStateException( I18n.err( I18n.ERR_749 ) );
> +        }
> +    }
> +
> +    private void markScanInvalid() throws InvalidLogException
> +    {
> +        invalidLog = true;
> +        throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +    }
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,175 @@
> +package org.apache.directory.server.log.impl;
> +
> +import java.io.IOException;
> +import java.io.EOFException;
> +import java.io.FileNotFoundException;
> +
> +
> +/**
> + * Defines an interface that log manager can use to manage log files.
> + *
> + * @author<a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
> + */
> +interface LogFileManager
> +{
> +    final static String LOG_NAME_PREFIX = "log_";
> +
> +    /**
> +     * Inits the log file manager to use the given logfile path and the suffix. Each log file
> +     * has name logFileName_<logFileNumber>.suffix
> +     *
> +     * @param logFilepath log file path
> +     * @param suffix suffix for log file.
> +     */
> +    public void init( String logFilepath, String suffix );
> +
> +
> +    /**
> +     * Returns a reader for the given log file number
> +     *
> +     * @param logFileNumber identifier of the log file to read
> +     * @return reader for the given logfile
> +     * @throws IOException
> +     * @throws FileNotFoundException
> +     */
> +    public LogFileReader getReaderForLogFile( long logFileNumber ) throws IOException, FileNotFoundException;
> +
> +
> +    /**
> +     * Returns a writer for the given log file number
> +     *
> +     * @param logFileNumber identifier of the log file to read
> +     * @return writer for the given logfile
> +     * @throws IOException
> +     * @throws FileNotFoundException
> +     */
> +    public LogFileWriter getWriterForLogFile( long logFileNumber ) throws IOException, FileNotFoundException;
> +
> +
> +    /**
> +     * Create a log file with the given identifier
> +     *
> +     * @param logFileNumber identifier of the log file to write to.
> +     * @return true if file already existed
> +     * @throws IOException
> +     */
> +    public boolean createLogFile( long logFileNumber ) throws IOException;
> +
> +
> +    /**
> +     * Truncates the file to the given size. Mostly used for throwing away
> +     * junk at the end of log file after a log replay after a crash.
> +     *
> +     * @param logFileNumber identifier of the log file
> +     * @param size new size of the file
> +     * @throws IOException
> +     */
> +    public void truncateLogFile( long logFileNumber, long size ) throws IOException;
> +
> +    /**
> +     * Deletes the underlying log file.
> +     *
> +     * @param logFileNumber identifier of the log file
> +     */
> +    public void deleteLogFile( long logFileNumber );
> +
> +
> +    /**
> +     * Moves the old log file to a new name
> +     *
> +     * @param orignalLogFileNumber identifier of the old file
> +     * @param newLongFileNumber identifier of the new file
> +     * @return true if the rename succeeded
> +     */
> +    public boolean rename(long orignalLogFileNumber, long newLongFileNumber);
> +
> +
> +    interface LogFileReader
> +    {
> +        /**
> +         *
> +         * Reads from the file at the current position
> +         *
> +         * @param buffer data destination
> +         * @param offset destination offset
> +         * @param length size of read
> +         * @return number of bytes actually read.
> +         * @throws IOException
> +         */
> +        public int read( byte[] buffer, int offset, int length ) throws IOException, EOFException;
> +
> +
> +        /**
> +         * Repositions the reader at the given offset
> +         *
> +         * @param position
> +         */
> +        public void seek( long position ) throws IOException;
> +
> +        /**
> +         * Close the log file reader and releases the resources
> +         *
> +         */
> +        public void close() throws IOException;
> +
> +
> +        /**
> +         * Each log file is assigned a sequence number. This method
> +         * returns that number
> +         *
> +         * @return number assigned to this log file
> +         */
> +        public long logFileNumber();
> +
> +        /**
> +         * returns the length of the file
> +         */
> +        public long getLength() throws IOException;
> +
> +        /**
> +         * returns the offset of the next read
> +         */
> +        public long getOffset() throws IOException;
> +    }
> +
> +    interface LogFileWriter
> +    {
> +        /**
> +         * Append the given data to the log file
> +         *
> +         * @param buffer source of data
> +         * @param offset offset into buffer
> +         * @param length number of bytes to be appended
> +         */
> +        public void append( byte[] buffer, int offset, int length ) throws IOException;
> +
> +
> +
> +        /**
> +         * Sync the file contents to media
> +         *
> +         */
> +        public void sync() throws IOException;
> +
> +        /**
> +         * Close the log file reader and releases the resources
> +         *
> +         */
> +        public void close() throws IOException;
> +
> +        /**
> +         * Each log file is assigned a sequence number. This method
> +         * returns that number
> +         *
> +         * @return number assigned to this log file
> +         */
> +        public long logFileNumber();
> +
> +        /**
> +         * returns the length of the file
> +         */
> +        public long getLength() throws IOException;
> +
> +    }
> +
> +}
> \ No newline at end of file
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileRecords.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,62 @@
> +
> +package org.apache.directory.server.log.impl;
> +
> +
> +public class LogFileRecords
> +{
> +    /**
> +     * When a user appends a log record, a header and
> +     * footer that surrounds the log record is inserted
> +     * into the log. Header and footer information makes
> +     * it easier to scan the log, detect the end of
> +     * log during log scan and verify the integrity of log.
> +     */
> +
> +    /**
> +     * Record Header marker
> +     * int RECORD_HEADER_MAGIC_NUMBER
> +     * int length  length of header + user log record + legnth of footer
> +     * long recordLSN     lsn of the log record
> +     * long headerChecksum checksum to verify header
> +     */
> +
> +    /** Header magic number */
> +    final static int RECORD_HEADER_MAGIC_NUMBER = 0x010F010F;
> +
> +    /** Total header size */
> +    final static int RECORD_HEADER_SIZE = 24;
> +
> +    /**
> +     * Record Footer marker
> +     * int checksum
> +     * int RECORD_FOOTER_MAGIC_NUMBER
> +     */
> +
> +    /** Footer magic number */
> +    final static int RECORD_FOOTER_MAGIC_NUMBER = 0x0F010F01;
> +
> +    /** Total header size */
> +    final static int RECORD_FOOTER_SIZE = 8;
> +
> +    /**
> +     * LogFileHeader marker
> +     * long log file number
> +     * int LOG_FILE_HEADER_MAGIC_NUMBER 0xFF00FF00
> +     */
> +
> +    /** Log file header marker size */
> +    final static int LOG_FILE_HEADER_SIZE = 12;
> +
> +    /** Log file header magic number */
> +    final static int LOG_FILE_HEADER_MAGIC_NUMBER = 0xFF00FF00;
> +
> +    /** Maximum marker size */
> +    final static int MAX_MARKER_SIZE;
> +
> +    static
> +    {
> +        int markerSize = Math.max( RECORD_HEADER_SIZE, RECORD_FOOTER_SIZE );
> +        MAX_MARKER_SIZE = Math.max( markerSize, LOG_FILE_HEADER_SIZE );
> +    }
> +
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,500 @@
> +
> +package org.apache.directory.server.log.impl;
> +
> +import java.nio.ByteBuffer;
> +
> +import java.util.concurrent.locks.Lock;
> +import java.util.concurrent.locks.ReentrantLock;
> +import java.util.concurrent.locks.Condition;
> +import java.util.concurrent.atomic.AtomicInteger;
> +
> +
> +import java.io.IOException;
> +
> +import org.apache.directory.server.i18n.I18n;
> +import org.apache.directory.server.log.InvalidLogException;
> +import org.apache.directory.server.log.LogAnchor;
> +import org.apache.directory.server.log.UserLogRecord;
> +
> +/**
> + * Manages the flushing of log to media and scanning of logs. All appends to the log file go through this class.
> + *
> + * Internally it manages a circular  buffer where appends initially go. Appends are first
> + * appended to this in memory circular log. As the in memory circular log fills up or as the user requests
> + *  memory buffer is flushed to the underlying media.
> + *
> + * @author<a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
> + */
> +class LogFlushManager
> +{
> +
> +    /** Ever increasing logical log sequence number assigned to user log records. Bumped up under append lock */
> +    private long logLSN = Long.MIN_VALUE + 1;
> +
> +    /** Memory buffer size in bytes */
> +    private final int logBufferSize;
> +
> +    /** Synchronizes appends */
> +    private final Lock appendLock = new ReentrantLock();
> +
> +    /** Synchronizes flushes to media */
> +    private final Lock flushLock = new ReentrantLock();
> +
> +    /** Used to wait on ongoing flush */
> +    private final Condition flushCondition = flushLock.newCondition();
> +
> +    /** In memory LogBuffer */
> +    private LogBuffer logBuffer;
> +
> +    /** Flush status */
> +    private FlushStatus flushStatus = new FlushStatus();
> +
> +    /** Current LogFile appends go to */
> +    private LogFileManager.LogFileWriter currentLogFile;
> +
> +    /** Log manager */
> +    LogManager logManager;
> +
> +    /** Size of data appended to the currentLogFile so far */
> +    long appendedSize;
> +
> +    /** Sof limit on the log file size */
> +    long targetLogFileSize;
> +
> +    public LogFlushManager(LogManager logManager, int logMemoryBufferSize, long logFileSize )
> +    {
> +        if ( ( logMemoryBufferSize<  0 ) || ( logFileSize<  0 ) )
> +        {
> +            throw new IllegalArgumentException( I18n.err( I18n.ERR_748, logMemoryBufferSize, logFileSize ) );
> +        }
> +
> +        logBufferSize = logMemoryBufferSize;
> +        targetLogFileSize = logFileSize;
> +        this.logManager = logManager;
> +
> +        logBuffer = new LogBuffer( logBufferSize, currentLogFile );
> +
> +    }
> +
> +    /**
> +     * Appends the given user record to the log. Position where the record is appended is returned as part of
> +     * userRecord.
> +     *
> +     * @param userLogRecord provides the user data to be appended to the log
> +     * @param sync if true, this calls returns after making sure that the appended data is reflected to the underlying file
> +     * @throws IOException
> +     * @throws InvalidLogException
> +     */
> +    public void append(UserLogRecord userRecord, boolean sync ) throws IOException, InvalidLogException
> +    {
> +        long lsn;
> +        boolean appendedRecord = false;
> +        byte[] userBuffer = userRecord.getDataBuffer();
> +        int length  = userRecord.getDataLength();
> +        LogAnchor userLogAnchor = userRecord.getLogAnchor();
> +
> +        int recordSize = LogFileRecords.RECORD_HEADER_SIZE + LogFileRecords.RECORD_FOOTER_SIZE + length;
> +
> +        appendLock.lock();
> +
> +        lsn = logLSN++;
> +
> +        if ( currentLogFile == null )
> +        {
> +            // We are just starting, get the current log file
> +            currentLogFile = logManager.switchToNextLogFile( null );
> +            appendedSize = currentLogFile.getLength();
> +        }
> +
> +        if ( appendedSize>  this.targetLogFileSize )
> +        {
> +            // Make sure everything outstanding goes to the current log file
> +            this.flush( lsn, null, 0, 0, true);
> +
> +            currentLogFile = logManager.switchToNextLogFile( currentLogFile );
> +            appendedSize = currentLogFile.getLength();
> +        }
> +
> +        if ( recordSize<= logBufferSize )
> +        {
> +            ByteBuffer writeHead = logBuffer.writeHead;
> +
> +            while ( !appendedRecord )
> +            {
> +                // First get the rewind count then the position to which the readhead advanced
> +                int readHeadRewindCount = logBuffer.readHeadRewindCount.get();
> +                int readHeadPosition = logBuffer.readHeadPosition;
> +
> +                if ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount ) ||
> +                    ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount + 1 )&&
> +                        ( readHeadPosition<  writeHead.position() ) ) )
> +                {
> +                    if ( writeHead.remaining()>= recordSize )
> +                    {
> +                        this.writeHeader( writeHead, length, lsn );
> +                        writeHead.put( userBuffer, 0, length );
> +                        this.writeFooter( writeHead, 0 );
> +                        appendedRecord = true;
> +                    }
> +                    else // ( writeHead.remaining()<  recordSize )
> +                    {
> +                        if ( writeHead.remaining()>= LogFileRecords.RECORD_HEADER_SIZE )
> +                        {
> +                            // Write a skip record
> +                            this.writeHeader( writeHead, -1, -1 );
> +                        }
> +
> +                        // rewind buffer now
> +                        writeHead.rewind();
> +                        logBuffer.writeHeadRewindCount++;
> +                    }
> +                }
> +                else
> +                {
> +                    assert( logBuffer.writeHeadRewindCount == ( readHeadRewindCount + 1 ) ) :
> +                            "Unexpected sequence number for read/write heads:" + logBuffer.writeHeadRewindCount +
> +                            " " + readHeadRewindCount;
> +
> +                    if ( ( readHeadPosition - writeHead.position() )>  recordSize )
> +                    {
> +                        this.writeHeader( writeHead, length, lsn );
> +                        writeHead.put( userBuffer, 0, length );
> +                        this.writeFooter( writeHead, 0 );
> +                        appendedRecord = true;
> +                    }
> +                    else
> +                    {
> +                        this.flush( lsn, null, 0, 0, true);
> +                    }
> +                }
> +            }
> +
> +        }
> +        else
> +        {
> +            this.flush( lsn, userBuffer, 0, length, true );
> +        }
> +
> +
> +
> +        userLogAnchor.resetLogAnchor( currentLogFile.logFileNumber(), appendedSize, lsn );
> +        this.appendedSize += recordSize;
> +
> +        appendLock.unlock();
> +
> +        if ( sync )
> +            this.flush( lsn, null, 0, 0, false );
> +
> +    }
> +
> +    /**
> +     * Flushes the changes in the log buffer upto the given point. The given point is determined as follows:
> +     * appendLock is held: flushLSN is the highest lsn generated by the logging system and no more appends can
> +     * proceed. In this case log is flushed until where the write head is.Log record with the flushLSN might not
> +     * have been appended yet.
> +     *
> +     * Otherwise: Given flushLSN is appended to the log already. Log is flushed upto max(flushLSN, current flashSatus.uptoLSN)
> +     *
> +     * Also userBuffer != null =>  appendLockHeld == true
> +     *
> +     * Only one thread can do flush. Once a thread find out that a flush is already going on, it waits for the ongoing flush
> +     * and is woken up to do its flush.
> +     *
> +     * flushStatus.uptoLSN represents the highest lsn that any thread wanted to sync. If a couple of threads wait on sync to
> +     * complete, the thread that wakes up and does the sync will take it for the team and sync upto flushStatus.uptoLSN so
> +     * that logging is more efficient.
> +     *
> +     *
> +     *
> +     * @param flushLSN max LSN the calling thread wants to sync upto
> +     * @param userBuffer if not null, user buffer is appended to the log without any buffering
> +     * @param offset offset of data in user buffer
> +     * @param length length of user data
> +     * @param appendLockHeld true if append lock is held
> +     * @throws IOException
> +     */
> +    private void flush( long flushLSN, byte[] userBuffer, int offset, int length,
> +                        boolean appendLockHeld ) throws IOException
> +    {
> +        long uptoLSN = flushLSN;
> +
> +        if ( appendLockHeld == true )
> +        {
> +            uptoLSN--;
> +        }
> +
> +        flushLock.lock();
> +
> +        // Update max requested lsn if necessary
> +        if ( uptoLSN>  flushStatus.uptoLSN )
> +        {
> +            flushStatus.uptoLSN = uptoLSN;
> +        }
> +
> +        /*
> +         * Check if we need to do flush and wait for ongoing flush if
> +         * necessary
> +         */
> +
> +        while ( true )
> +        {
> +            if ( ( flushStatus.flushedLSN>= uptoLSN )&&  ( appendLockHeld == false ) )
> +            {
> +                flushLock.unlock();
> +                return;
> +            }
> +
> +            if ( flushStatus.flushInProgress == false )
> +            {
> +                break;
> +            }
> +
> +            flushStatus.numWaiters++;
> +            flushCondition.awaitUninterruptibly();
> +            flushStatus.numWaiters--;
> +        }
> +
> +        // Mark flush in progress and do the flush
> +        flushStatus.flushInProgress = true;
> +
> +        // If not appendlock held, adjust uptoLSN with the max one requested by any thread
> +        if ( appendLockHeld == false )
> +        {
> +            uptoLSN = flushStatus.uptoLSN;
> +        }
> +        else
> +        {
> +            uptoLSN = flushLSN;
> +        }
> +
> +        flushLock.unlock();
> +
> +        long flushedLSN = this.doFlush( uptoLSN, appendLockHeld );
> +
> +        // Now if there is a user buffer, flush from that
> +        if ( userBuffer != null )
> +        {
> +            ByteBuffer headerFooterHead = logBuffer.headerFooterHead;
> +
> +            headerFooterHead.rewind();
> +            this.writeHeader( headerFooterHead, length, flushLSN );
> +            currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_HEADER_MAGIC_NUMBER );
> +
> +            currentLogFile.append( userBuffer, offset, length );
> +
> +            headerFooterHead.rewind();
> +            this.writeFooter( headerFooterHead, 0 );
> +            currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_FOOTER_SIZE );
> +
> +            flushedLSN = flushLSN;
> +        }
> +
> +        currentLogFile.sync();
> +
> +        flushLock.lock();
> +
> +        if ( flushedLSN != LogAnchor.UNKNOWN_LSN )
> +        {
> +            flushStatus.flushedLSN = flushedLSN;
> +
> +            if ( flushStatus.flushedLSN>  flushStatus.uptoLSN )
> +            {
> +                // This should only happen with append lock held
> +                assert( appendLockHeld == true ) : "FlushedLSN went ahead of uptoLSN while appendlock is not held: " + flushStatus.flushedLSN + "  " + flushStatus.uptoLSN;
> +
> +                flushStatus.uptoLSN = flushStatus.flushedLSN;
> +            }
> +        }
> +
> +        flushStatus.flushInProgress = false;
> +
> +        if ( flushStatus.numWaiters != 0 )
> +        {
> +            flushCondition.signalAll();
> +        }
> +
> +        flushLock.unlock();
> +    }
> +
> +
> +    /**
> +     * Walks the log buffer and writes it to the underlying log file until the uptoLSN or current write head.
> +     *
> +     * @param uptoLSN max LSN until where log is flushed
> +     * @param appendLockHeld true if appendlock held.
> +     * @return lsn upto which flush is done. UNKNOWN_LSN if no flushing is done.
> +     * @throws IOException
> +     */
> +    private long doFlush( long uptoLSN, boolean appendLockHeld  ) throws IOException
> +    {
> +        ByteBuffer readHead = logBuffer.readHead;
> +        ByteBuffer writeHead = logBuffer.writeHead;
> +        boolean done = false;
> +
> +        int magicNumber;
> +        int length;
> +        long lsn = LogAnchor.UNKNOWN_LSN;
> +
> +        while ( !done )
> +        {
> +            int totalLength = 0;
> +            while( true )
> +            {
> +                /*
> +                 * If append lock is held, we might hit write head. We can read
> +                 * the write head here when append lock is held
> +                 */
> +                if ( appendLockHeld )
> +                {
> +                    if ( ( writeHead.position() == readHead.position() )&&
> +                            ( logBuffer.writeHeadRewindCount == logBuffer.readHeadRewindCount.get() ) )
> +                    {
> +                        done = true;
> +                        break;
> +                    }
> +                }
> +
> +                // If less than header length left to process, then break and flush whatever we got so far
> +                if ( readHead.remaining()<  LogFileRecords.RECORD_HEADER_SIZE )
> +                    break;
> +
> +                magicNumber = readHead.getInt();
> +
> +                assert( magicNumber == LogFileRecords.RECORD_HEADER_MAGIC_NUMBER ) : " Record header magic " +
> +                		"number does not match " + magicNumber + " expected "+
> +                		LogFileRecords.RECORD_HEADER_MAGIC_NUMBER;
> +
> +                length = readHead.getInt();
> +
> +                // Did we hit a skip record at the end of the buffer?
> +                if ( length == LogBuffer.SKIP_RECORD_LENGTH )
> +                    break;
> +
> +
> +                // Sanitize length, it includes header and footer overhead
> +                assert( length>   ( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER + LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER) ) :
> +                    "Record length doesnt make sense:" + length + " expected:" +
> +                    ( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER + LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER);
> +
> +                // Add to the total length
> +                totalLength += length;
> +
> +                lsn = readHead.getLong();
> +
> +                // Move to the next record, we processed 16 bytes already
> +                readHead.position( readHead.position() + length - 16 );
> +
> +                if ( lsn>= uptoLSN )
> +                {
> +                    done = true;
> +                    break;
> +                }
> +
> +            }
> +
> +            // If there is something to flush, then do it now
> +            if ( totalLength>  0 )
> +            {
> +                int offset;
> +                offset = logBuffer.readHeadPosition;
> +
> +                currentLogFile.append( logBuffer.buffer, offset, totalLength );
> +
> +                //move the position to the next record
> +                logBuffer.readHeadPosition = readHead.position();
> +            }
> +
> +            if ( !done )
> +            {
> +                // this means we need to rewind and keep flushing
> +                logBuffer.readHeadPosition = 0;
> +                readHead.rewind();
> +                logBuffer.readHeadRewindCount.incrementAndGet();
> +            }
> +        }
> +
> +        return lsn;
> +
> +    }
> +
> +    private void writeHeader ( ByteBuffer buffer, int length, long lsn )
> +    {
> +        buffer.putInt( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER );
> +        buffer.putInt( length );
> +        buffer.putLong( lsn );
> +        buffer.putLong( length ^ lsn );
> +    }
> +
> +    private void writeFooter ( ByteBuffer buffer, int checksum )
> +    {
> +        buffer.putInt( checksum );
> +        buffer.putInt( LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER );
> +    }
> +
> +
> +    /**
> +     * Used to group the memory buffer data together
> +     */
> +    private static class LogBuffer
> +    {
> +        /** In memory buffer */
> +        byte buffer[];
> +
> +        /** Used to scan the buffer while reading it to flush */
> +        ByteBuffer readHead;
> +
> +        /** Advanced as readHead flushes data */
> +        int readHeadPosition;
> +
> +        /** Rewind count of readHead..used to avoid overwriting nonflushed data */
> +        AtomicInteger readHeadRewindCount;
> +
> +        /** Used to scan the buffer while appending records into it */
> +        ByteBuffer writeHead;
> +
> +        /** Rewind count of writeHead..used to avoid overwriting nonflushed data */
> +        int writeHeadRewindCount;
> +
> +        /** Used to mark records that should be skipped at the end of the log buffer */
> +        final static int SKIP_RECORD_LENGTH = -1;
> +
> +        /** Header footer buffer used when writing user buffers directly */
> +        byte headerFooterBuffer[];
> +
> +        /** Used to format header footer buffer */
> +        ByteBuffer headerFooterHead;
> +
> +        public LogBuffer( int bufferSize, LogFileManager.LogFileWriter currentLogFile )
> +        {
> +            buffer = new byte[bufferSize];
> +            readHead = ByteBuffer.wrap( buffer );
> +
> +            readHeadRewindCount = new AtomicInteger( 0 );
> +
> +            writeHead = ByteBuffer.wrap( buffer );
> +
> +            headerFooterBuffer = new byte[LogFileRecords.MAX_MARKER_SIZE];
> +            headerFooterHead = ByteBuffer.wrap( headerFooterBuffer );
> +        }
> +    }
> +
> +    /**
> +     * Used to group the flush related data together
> +     */
> +    private static class FlushStatus
> +    {
> +        /** whether flush is going on */
> +        boolean flushInProgress;
> +
> +        /** Current flush request */
> +        long uptoLSN;
> +
> +
> +        /** Current flushed lsn */
> +        long flushedLSN;
> +
> +
> +        /** Keeps track of the number of waiters */
> +        int numWaiters;
> +    }
> +}
>
> Added: directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java
> URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java?rev=1177627&view=auto
> ==============================================================================
> --- directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java (added)
> +++ directory/apacheds/trunk/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java Fri Sep 30 12:28:59 2011
> @@ -0,0 +1,494 @@
> +
> +package org.apache.directory.server.log.impl;
> +
> +import java.nio.ByteBuffer;
> +
> +import java.util.concurrent.locks.ReentrantLock;
> +import java.util.concurrent.locks.Lock;
> +
> +import java.io.IOException;
> +import java.io.FileNotFoundException;
> +import java.io.EOFException;
> +
> +import org.apache.directory.server.i18n.I18n;
> +import org.apache.directory.server.log.InvalidLogException;
> +import org.apache.directory.server.log.LogAnchor;
> +import org.apache.directory.server.log.LogAnchorComparator;
> +import org.apache.directory.server.log.LogScanner;
> +import org.apache.directory.server.log.UserLogRecord;
> +
> +class LogManager
> +{
> +
> +    /**  Controlfile record size */
> +    private final static int CONTROLFILE_RECORD_SIZE = 36;
> +
> +    /** Controlfile file magic number */
> +    private final static int CONTROLFILE_MAGIC_NUMBER = 0xFF11FF11;
> +
> +    /** Controlfile log file number */
> +    private final static long CONTROLFILE_LOG_FILE_NUMBER = -1;
> +
> +    /** Shadow Controlfile log file number */
> +    private final static long CONTROLFILE_SHADOW_LOG_FILE_NUMBER = -2;
> +
> +    /** buffer used to do IO on controlfile */
> +    byte controlFileBuffer[] = new byte[CONTROLFILE_RECORD_SIZE];
> +
> +    /** ByteBuffer used to to IO on checkpoint file */
> +    ByteBuffer controlFileMarker = ByteBuffer.wrap( controlFileBuffer );
> +
> +    /** Current checkpoint record in memory */
> +    ControlFileRecord controlFileRecord = new ControlFileRecord();
> +
> +    /** Min neeeded point in the log */
> +    LogAnchor minLogAnchor = new LogAnchor();
> +
> +    /** Protects minLogAchor */
> +    Lock minLogAnchorLock = new ReentrantLock();
> +
> +    /** Log file manager */
> +    LogFileManager logFileManager;
> +
> +    /** Log Anchor comparator */
> +    LogAnchorComparator anchorComparator = new LogAnchorComparator();
> +
> +    /** Current log file */
> +    private long currentLogFileNumber;
> +
> +    /** Buffer used to read log file markers */
> +    byte markerBuffer[] = new byte[LogFileRecords.LOG_FILE_HEADER_SIZE];
> +
> +    /** ByteBuffer wrapper for the marker buffer */
> +    ByteBuffer markerHead = ByteBuffer.wrap( markerBuffer );
> +
> +
> +    public LogManager( LogFileManager logFileManager )
> +    {
> +        this.logFileManager = logFileManager;
> +    }
> +
> +    /**
> +     *Initializes the log management:
> +     * 1) Checks if control file exists and creates it if necesssary. If it exists, it reads it and loads the latest checkpoint.
> +     * 2) Starts from the lates checkpoint ans scans forwards the logs to check for corrupted logs and determine the end of the log.
> +     * This scan ends either when a properly ended log file is found or a partially written log record is found.
> +     *
> +     * @throws IOException
> +     * @throws InvalidLogException
> +     */
> +    public void initLogManager() throws IOException, InvalidLogException
> +    {
> +        LogAnchor scanPoint = new LogAnchor();
> +        LogScanner scanner;
> +        UserLogRecord logRecord;
> +        LogFileManager.LogFileReader reader;
> +
> +
> +
> +        // Read and verify control file
> +        boolean controlFileExists = true;
> +        try
> +        {
> +            this.readControlFile();
> +        }
> +        catch( FileNotFoundException e )
> +        {
> +            controlFileExists = false;
> +        }
> +
> +        if ( controlFileExists )
> +        {
> +            boolean invalidLog = false;
> +
> +            // Set the min log anchor from the control file
> +            minLogAnchor.resetLogAnchor( controlFileRecord.minNeededLogFile,
> +                    controlFileRecord.minNeededLogFileOffset, controlFileRecord.minNeededLSN );
> +
> +            scanPoint.resetLogAnchor( minLogAnchor );
> +
> +            logRecord = new UserLogRecord();
> +            scanner = new DefaultLogScanner( scanPoint, logFileManager );
> +
> +            try
> +            {
> +                while ( scanner.getNextRecord( logRecord ) )
> +                {
> +                    // No need to do anything with the log record
> +                }
> +            }
> +            catch( InvalidLogException e )
> +            {
> +                invalidLog = true;
> +            }
> +            finally
> +            {
> +                scanner.close();
> +            }
> +
> +            long lastGoodLogFileNumber = scanner.getLastGoodFileNumber();
> +            long lastGoodLogFileOffset = scanner.getLastGoodOffset();
> +            currentLogFileNumber = lastGoodLogFileNumber;
> +
> +            if ( ( lastGoodLogFileNumber<  LogAnchor.MIN_LOG_NUMBER ) ||
> +                ( lastGoodLogFileOffset<  LogAnchor.MIN_LOG_OFFSET ))
> +            {
> +                throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +            }
> +
> +            scanPoint.resetLogAnchor( lastGoodLogFileNumber, lastGoodLogFileOffset,
> +                    LogAnchor.UNKNOWN_LSN );
> +
> +            if ( anchorComparator.compare( scanPoint, minLogAnchor )<  0 )
> +            {
> +                throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +            }
> +
> +            /*
> +             * If invalid content at the end of file:
> +             * if we are past the header of file, then
> +             * truncate the file to the end of the last
> +             * read log record, otherwise we read a partially
> +             * written log file header, in this case reformat the log file.
> +             * Also check next for the existence of next file to make
> +             * sure we really read the last log file.
> +             */
> +            if ( invalidLog )
> +            {
> +                // Check if next log file exists
> +                reader = null;
> +                try
> +                {
> +                    reader = logFileManager.getReaderForLogFile( ( lastGoodLogFileNumber + 1 ) );
> +
> +                }
> +                catch ( FileNotFoundException e )
> +                {
> +                    // Fine, this is what we want
> +                }
> +                finally
> +                {
> +                    if ( reader != null )
> +                    {
> +                        reader.close();
> +                    }
> +                }
> +
> +                if ( reader != null )
> +                {
> +                    throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +                }
> +
> +                if  ( lastGoodLogFileOffset>= LogFileRecords.LOG_FILE_HEADER_SIZE  )
> +                {
> +                    logFileManager.truncateLogFile( lastGoodLogFileNumber, lastGoodLogFileOffset );
> +                }
> +                else
> +                {
> +                    // Reformat the existing log file
> +                    this.createNextLogFile( true);
> +                }
> +            }
> +
> +        }
> +        {
> +            /*
> +             * Control file does not exist. Either we are at the very beginning or
> +             * maybe we crashed in the middle of creating the first log file.
> +             * We  should have the min log file at most with the file header formatted.
> +             */
> +           reader = null;
> +           boolean fileExists = false;
> +           currentLogFileNumber = LogAnchor.MIN_LOG_NUMBER;
> +           try
> +           {
> +               reader = logFileManager.getReaderForLogFile( LogAnchor.MIN_LOG_NUMBER );
> +
> +               if ( reader.getLength()>  LogFileRecords.LOG_FILE_HEADER_SIZE )
> +               {
> +                   throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +               }
> +               fileExists = true;
> +           }
> +           catch ( FileNotFoundException e )
> +           {
> +               // Fine, we will create the file
> +           }
> +           finally
> +           {
> +               if ( reader != null )
> +               {
> +                   reader.close();
> +               }
> +           }
> +
> +
> +
> +           this.createNextLogFile( fileExists );
> +
> +            // Prepare the min log anchor and control file and write the control file
> +           minLogAnchor.resetLogAnchor( LogAnchor.MIN_LOG_NUMBER, LogFileRecords.LOG_FILE_HEADER_SIZE, LogAnchor.UNKNOWN_LSN );
> +
> +           this.writeControlFile();
> +        }
> +    }
> +
> +    /**
> +     * Called by LogFlushManager to switch to the next file.
> +     *
> +     * Note:Currently we do a checkpoint and delete unnecessary log files when we switch to a new file. Some
> +     * of this tasks can be delegated to a background thread later.
> +     *
> +     * @param currentWriter current log file used by the flush manager. Null if the flush manager is just starting up.
> +     * @return new lgo file to be used.
> +     * @throws IOException
> +     * @throws InvalidLogException
> +     */
> +    public LogFileManager.LogFileWriter switchToNextLogFile( LogFileManager.LogFileWriter currentWriter ) throws IOException, InvalidLogException
> +    {
> +        if ( currentWriter != null )
> +        {
> +            currentWriter.close();
> +            this.writeControlFile();
> +            this.createNextLogFile( false );
> +        }
> +
> +        return logFileManager.getWriterForLogFile( this.currentLogFileNumber );
> +    }
> +
> +    /**
> +     * Called when the logging subsystem is notified about the minimum position
> +     * in the log files that is needed. Log manager uses this information to advance
> +     * its checkpoint and delete unnecessary log files.
> +     *
> +     * @param newLogAnchor min needed log anchor
> +     */
> +    public void advanceMinLogAnchor( LogAnchor newLogAnchor )
> +    {
> +        if ( newLogAnchor == null )
> +        {
> +            return;
> +        }
> +
> +        minLogAnchorLock.lock();
> +
> +        if ( anchorComparator.compare( minLogAnchor, newLogAnchor )<  0 )
> +        {
> +            minLogAnchor.resetLogAnchor( newLogAnchor );
> +        }
> +
> +        minLogAnchorLock.unlock();
> +    }
> +
> +    /**
> +     * Writes the control file. To make paritally written control files unlikely,
> +     * data is first written to a shadow file and then moved(renamed) to the controlfile.
> +     * Move of a file is atomic in POSIX systems, in GFS like file systems(in HDFS for example).
> +     * On windows, it is not always atomic but atomic versions of rename operations started to
> +     * appear in their recent file systems.
> +     *
> +     * @throws IOException
> +     */
> +    private void writeControlFile() throws IOException
> +    {
> +        // Copy the min log file anchor
> +        minLogAnchorLock.lock();
> +
> +        controlFileRecord.minNeededLogFile = minLogAnchor.getLogFileNumber();
> +        controlFileRecord.minNeededLogFileOffset = minLogAnchor.getLogFileOffset();
> +        controlFileRecord.minNeededLSN = minLogAnchor.getLogLSN();
> +
> +        minLogAnchorLock.unlock();
> +
> +        if ( controlFileRecord.minNeededLogFile>  controlFileRecord.minExistingLogFile  )
> +        {
> +            this.deleteUnnecessaryLogFiles( controlFileRecord.minExistingLogFile,controlFileRecord.minNeededLogFile );
> +            controlFileRecord.minExistingLogFile = controlFileRecord.minNeededLogFile;
> +
> +        }
> +
> +        // TODO compute checksum for log record here
> +
> +
> +        controlFileMarker.rewind();
> +        controlFileMarker.putLong( controlFileRecord.minExistingLogFile );
> +        controlFileMarker.putLong( controlFileRecord.minNeededLogFile );
> +        controlFileMarker.putLong( controlFileRecord.minNeededLogFileOffset );
> +        controlFileMarker.putLong( controlFileRecord.minNeededLSN );
> +        controlFileMarker.putLong( controlFileRecord.checksum );
> +        controlFileMarker.putInt( CONTROLFILE_MAGIC_NUMBER );
> +
> +
> +        boolean shadowFileExists = logFileManager.createLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER  );
> +
> +        if ( shadowFileExists )
> +        {
> +            logFileManager.truncateLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER, 0 );
> +        }
> +
> +        LogFileManager.LogFileWriter controlFileWriter = logFileManager.getWriterForLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER );
> +
> +        try
> +        {
> +            controlFileWriter.append( controlFileBuffer, 0, CONTROLFILE_RECORD_SIZE);
> +            controlFileWriter.sync();
> +        }
> +        finally
> +        {
> +            controlFileWriter.close();
> +        }
> +
> +        // Do the move now
> +        logFileManager.rename( CONTROLFILE_SHADOW_LOG_FILE_NUMBER , CONTROLFILE_LOG_FILE_NUMBER );
> +
> +
> +    }
> +
> +    /**
> +     * Read and verifies the control file.
> +     *
> +     * @throws IOException
> +     * @throws InvalidLogException
> +     * @throws FileNotFoundException
> +     */
> +    private void readControlFile() throws IOException, InvalidLogException, FileNotFoundException
> +    {
> +        boolean invalidControlFile = false;
> +        LogFileManager.LogFileReader controlFileReader = logFileManager.getReaderForLogFile( CONTROLFILE_LOG_FILE_NUMBER );
> +
> +        try
> +        {
> +            controlFileReader.read( controlFileBuffer, 0, CONTROLFILE_RECORD_SIZE );
> +        }
> +        catch( EOFException e )
> +        {
> +            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) , e);
> +        }
> +        finally
> +        {
> +            controlFileReader.close();
> +        }
> +
> +        controlFileMarker.rewind();
> +        controlFileRecord.minExistingLogFile = controlFileMarker.getLong();
> +        controlFileRecord.minNeededLogFile = controlFileMarker.getLong();
> +        controlFileRecord.minNeededLogFileOffset = controlFileMarker.getLong();
> +        controlFileRecord.minNeededLSN = controlFileMarker.getLong();
> +        controlFileRecord.checksum = controlFileMarker.getLong();
> +        int magicNumber = controlFileMarker.getInt();
> +
> +
> +        if ( controlFileRecord.minExistingLogFile<  LogAnchor.MIN_LOG_NUMBER )
> +        {
> +            invalidControlFile = true;
> +        }
> +
> +        if ( (controlFileRecord.minNeededLogFile<  LogAnchor.MIN_LOG_NUMBER ) ||
> +              ( controlFileRecord.minNeededLogFileOffset<  LogAnchor.MIN_LOG_OFFSET ) )
> +        {
> +            invalidControlFile = true;
> +        }
> +
> +        if ( controlFileRecord.minExistingLogFile>  controlFileRecord.minNeededLogFile )
> +        {
> +            invalidControlFile = true;
> +        }
> +
> +        if ( magicNumber != this.CONTROLFILE_MAGIC_NUMBER )
> +        {
> +            invalidControlFile = true;
> +        }
> +
> +        // TODO compute and compare checksum
> +
> +        if ( invalidControlFile == true )
> +        {
> +            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +        }
> +
> +    }
> +
> +    /**
> +     * Creates the next log file. If the log file already exists, then it is reformatted, that is,
> +     * its size is truncated to zero and file header is writtten again.
> +     *
> +     * @param reformatExistingFile log file already exists and should be formatted. If false, log file should not exist.
> +     * @throws IOException
> +     * @throws InvalidLogException
> +     */
> +    private void createNextLogFile( boolean reformatExistingFile ) throws IOException, InvalidLogException
> +    {
> +        LogFileManager.LogFileWriter writer = null;
> +
> +        long logFileNumber = this.currentLogFileNumber;
> +
> +        if ( reformatExistingFile == false )
> +        {
> +            logFileNumber++;
> +        }
> +
> +        // Try to create the file.
> +        boolean fileAlreadyExists = logFileManager.createLogFile( logFileNumber );
> +
> +        if ( ( reformatExistingFile == false )&&  ( fileAlreadyExists == true ) )
> +        {
> +            // Didnt expect the file to be around
> +            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +        }
> +
> +        if ( ( reformatExistingFile == true )&&  ( fileAlreadyExists == false ) )
> +        {
> +            // Didnt expect the file to be around
> +            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
> +        }
> +
> +        if ( reformatExistingFile )
> +        {
> +            logFileManager.truncateLogFile( logFileNumber, LogAnchor.MIN_LOG_OFFSET );
> +
> +        }
> +
> +        writer = logFileManager.getWriterForLogFile( logFileNumber );
> +
> +        try
> +        {
> +            markerHead.rewind();
> +            markerHead.putLong( logFileNumber );
> +            markerHead.putInt( LogFileRecords.LOG_FILE_HEADER_MAGIC_NUMBER );
> +            writer.append( markerBuffer, 0, LogFileRecords.LOG_FILE_HEADER_SIZE );
> +            writer.sync();
> +        }
> +        finally
> +        {
> +            writer.close();
> +        }
> +
> +        this.currentLogFileNumber = logFileNumber;
> +
> +    }
> +
> +    private void deleteUnnecessaryLogFiles( long startingLogFileNumber, long endingLogFileNumber )
> +    {
> +        for ( long logFileNumber = startingLogFileNumber; logFileNumber<  endingLogFileNumber;
> +                logFileNumber++ )
> +        {
> +            // Do a best effort delete
> +            logFileManager.deleteLogFile( logFileNumber );
> +        }
> +    }
> +
> +    /**
> +     * Checkpoint record
> +     */
> +     private class ControlFileRecord
> +     {
> +         long minExistingLogFile;
> +         long minNeededLogFile;
> +         long minNeededLogFileOffset;
> +         long minNeededLSN;
> +         long checksum;
> +     }
> +
> +
> +
> +}
>
>