You are viewing a plain text version of this content. The canonical link for it is here.
Posted to derby-commits@db.apache.org by oy...@apache.org on 2007/10/18 10:49:08 UTC

svn commit: r585900 - in /db/derby/code/trunk/java/engine/org/apache/derby: iapi/services/replication/master/ impl/services/replication/buffer/ impl/services/replication/master/ impl/services/replication/slave/ impl/store/raw/log/

Author: oysteing
Date: Thu Oct 18 01:49:07 2007
New Revision: 585900

URL: http://svn.apache.org/viewvc?rev=585900&view=rev
Log:
DERBY-2977 (Contributed by Jorgen Loland): Fixes a problem with the way log is appended to the repliation buffer. Previously, LogToFile#appendLogRecord was used to write log to the replication master log buffer. However, the LogAccessFile class that wraps write operations to log file also write checksums. We need these checksums at the slave because without them, the log instants on the slave and master will differ, in turn making recovery impossible (undo operations refer to a specific instant, which will not be correct).

With this patch, log is appended to the repliation log buffer in LogAccessFile, not LogToFile like now. The patch modifies the following files:

M java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java
M java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java

The buffer now accepts chunks of log records from LogAccessFile#writeToLog (i.e., appended to replication buffer in the same method that writes the log to disk) instead of single log records from LogToFile#appendLogRecord

M java/engine/org/apache/derby/impl/services/replication/slave/ReplicationLogScan.java

Modified the slave-side log parser to read the new chunk of log record format

M java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
M java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java

Modified appendLog signature to accept log from LogAccessFile instead of from LogToFile

M java/engine/org/apache/derby/impl/store/raw/log/LogAccessFile.java

Can be set inReplicationMasterMode, in which log is appended to MasterFactory

M java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java

Removed the code that appends log to replication log buffer, and tells LogAccessFile to go into replication master mode when needed. 

Modified:
    db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/ReplicationLogScan.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFile.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFileBuffer.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java

Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java?rev=585900&r1=585899&r2=585900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/services/replication/master/MasterFactory.java Thu Oct 18 01:49:07 2007
@@ -117,24 +117,17 @@
     public void stopMaster();
 
     /**
-     * Append a single log record to the replication log buffer.
-     *
-     * @param dataLength            number of bytes in data[]
-     * @param instant               the log address of this log record.
-     * @param data                  "from" array to copy "data" portion of rec
-     * @param dataOffset            offset in data[] to start copying from.
-     * @param optionalData          "from" array to copy "optional data" from
-     * @param optionalDataOffset    offset in optionalData[] to start copy from
-     * @param optionalDataLength    number of bytes in optionalData[]
+     * Append a chunk of log records to the log buffer.
      *
+     * @param greatestInstant   the instant of the log record that was
+     *                          added last to this chunk of log
+     * @param log               the chunk of log records
+     * @param logOffset         offset in log to start copy from
+     * @param logLength         number of bytes to copy, starting
+     *                          from logOffset
      **/
-    public void appendLogRecord(int dataLength,
-                                long instant,
-                                byte[] data,
-                                int dataOffset,
-                                byte[] optionalData, 
-                                int optionalDataOffset,
-                                int optionalDataLength);
+    public void appendLog(long greatestInstant,
+                          byte[] log, int logOffset, int logLength);
 
     /**
      * Used by the LogFactory to notify the replication master

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java?rev=585900&r1=585899&r2=585900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/LogBufferElement.java Thu Oct 18 01:49:07 2007
@@ -25,21 +25,18 @@
 
 /**
  * ReplicationLogBuffer consists of n LogBufferElements, each of which
- * can store m log records in a single byte[].
- *
+ * can store a number of log records in a single byte[].
  * <p>
- * The format of each log record in the LogBufferElement is:
- * <br>
- * (long)   instant <br>
- * (int)    dataLength <br>
- * (int)    dataOffset <br>
- * (int)    optionalDataLength <br>
- * (int)    optionalDataOffset <br>
- * (byte[]) data (with length dataLength) <br>
- * (byte[]) optionalData (with length optionalDataLength) <br>
- * <br>
+ * The format of each log record in the LogBufferElement is the same
+ * as is written to log file in LogAccessFile:<br>
+ *
+ * (int)    total_length (data[].length + optionaldata[].length)<br>
+ * (long)   instant<br>
+ * (byte[]) data+optionaldata<br>
+ * (int)    total_length<br>
+ *
  * </p>
- * In addition to adding the log record information to the byte[], the
+ * In addition to adding a chunk of log records to the byte[], the
  * greatestInstant variable is updated for every append so that
  * getLastInstant can be used to get the highest log instant in this
  * LogBufferElement.
@@ -69,47 +66,26 @@
     }
 
     /**
-     * Append a single log record to this LogBufferElement.
+     * Append a chunk of log records to this LogBufferElement.
      *
-     * @param instant               the log address of this log record.
-     * @param dataLength            number of bytes in data[]
-     * @param dataOffset            offset in data[] to start copying from.
-     * @param optionalDataLength    number of bytes in optionalData[]
-     * @param optionalDataOffset    offset in optionalData[] to start copy from
-     * @param data                  "from" array to copy "data" portion of rec
-     * @param optionalData          "from" array to copy "optional data" from
+     * @param greatestInstant   the instant of the log record that was
+     *                          added last to this chunk of log
+     * @param log               the chunk of log records
+     * @param logOffset         offset in log to start copy from
+     * @param logLength         number of bytes to copy, starting
+     *                          from logOffset
      **/
-    protected void appendLogRecord(long instant,
-                                int dataLength,
-                                int dataOffset,
-                                int optionalDataLength,
-                                int optionalDataOffset,
-                                byte[] data,
-                                byte[] optionalData){
+    protected void appendLog(long greatestInstant,
+                             byte[] log, int logOffset, int logLength) {
 
         if (SanityManager.DEBUG){
-            int totalSize = dataLength + optionalDataLength +
-                ReplicationLogBuffer.LOG_RECORD_FIXED_OVERHEAD_SIZE;
-            SanityManager.ASSERT(freeSize() >= totalSize,
-                                 "Log record does not fit into"+
+            SanityManager.ASSERT(freeSize() >= logLength,
+                                 "Log chunk does not fit into"+
                                  " this LogBufferElement");
         }
 
-        position = appendLong(instant, position);
-        position = appendInt(dataLength, position);
-        position = appendInt(dataOffset, position);
-        position = appendInt(optionalDataLength, position);
-        position = appendInt(optionalDataOffset, position);
-
-        if (dataLength > 0){
-            position = appendBytes(data, position, dataLength);
-        }
-
-        if (optionalDataLength > 0) {
-            position = appendBytes(optionalData, position, optionalDataLength);
-        }
-
-        this.greatestInstant = instant;
+        this.greatestInstant = greatestInstant;
+        position = appendBytes(log, logOffset, position, logLength);
     }
 
     /**
@@ -161,45 +137,23 @@
 
     /**
      * Append a byte[] to this LogBufferElement.
+     *
+     * @param b       where the bytes are copied from
+     * @param offset  offset in b to start copying from
+     * @param pos     the position in this LogBufferElement to start copying to
+     * @param length  number of bytes to copy from b, starting from offset
+     *
      * @return new position
      */
-    private int appendBytes(byte b[], int pos, int length) {
+    private int appendBytes(byte b[], int offset, int pos, int length) {
         if (SanityManager.DEBUG){
-            SanityManager.ASSERT(freeSize() >= (pos+length),
+            SanityManager.ASSERT(freeSize() >= length,
                                  "byte[] is to big to fit"+
                                  " into this buffer");
             SanityManager.ASSERT(b != null, "Cannot append null to buffer");
         }
-        System.arraycopy(b, 0, bufferdata, pos, length);
+        System.arraycopy(b, offset, bufferdata, pos, length);
         return pos + length;
-    }
-
-    /**
-     * Append an int to this LogBufferElement.
-     * @return new position
-     */
-    private int appendInt(int i, int p) {
-        bufferdata[p++] = (byte) (i >> 24);
-        bufferdata[p++] = (byte) (i >> 16);
-        bufferdata[p++] = (byte) (i >> 8);
-        bufferdata[p++] = (byte) i;
-        return p;
-    }
-
-    /**
-     * Append a long to this LogBufferElement.
-     * @return new position
-     */
-    private int appendLong(long l, int p) {
-        bufferdata[p++] = (byte) (l >> 56);
-        bufferdata[p++] = (byte) (l >> 48);
-        bufferdata[p++] = (byte) (l >> 40);
-        bufferdata[p++] = (byte) (l >> 32);
-        bufferdata[p++] = (byte) (l >> 24);
-        bufferdata[p++] = (byte) (l >> 16);
-        bufferdata[p++] = (byte) (l >> 8);
-        bufferdata[p++] = (byte) l;
-        return p;
     }
 
 }

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java?rev=585900&r1=585899&r2=585900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java Thu Oct 18 01:49:07 2007
@@ -37,12 +37,12 @@
  *
  * ReplicationLogBuffer consists of a number of LogBufferElements.
  * Elements that are not in use are in the freeBuffers list, while
- * elements that contains dirty log are in dirtyBuffers. Log records
+ * elements that contains dirty log are in dirtyBuffers. Chunks of log records
  * are appended to the buffer element in currentDirtyBuffer. Hence,
  * the life cycle of buffer elements is:
  * freeBuffers -> currentDirtyBuffer -> dirtyBuffers -> freeBuffers
  *
- * To append log records to the buffer, use appendLogRecord(...)
+ * To append chunks of log records to the buffer, use appendLog(...)
  *
  * To consume chunks of log records, use next() followed by getData(),
  * getLastInstant() and getSize(). These get-methods throw
@@ -58,17 +58,10 @@
 
     private static final int DEFAULT_NUMBER_LOG_BUFFERS = 10;
 
-    protected static final int LOG_RECORD_FIXED_OVERHEAD_SIZE = 24;
-    // long instant           - 8
-    // int dataLength         - 4
-    // int dataOffset         - 4
-    // int optionalDataLength - 4
-    // int optionalDataOffset - 4
-
     private final LinkedList dirtyBuffers;// LogBufferElements with unsent log
     private final LinkedList freeBuffers; // currently unused LogBufferElements
 
-    // the buffer we currently APPEND log records to
+    // the buffer we currently APPEND chunks of log records to
     private LogBufferElement currentDirtyBuffer;
 
     // used to GET data from this buffer. next() sets these
@@ -79,10 +72,10 @@
 
     // Two objects to synchronize on so that the logger (LogToFile)
     // and the log consumer (LogShipping service) can use the buffer
-    // concurrently (although appendLogRecord may conflict with next).
+    // concurrently (although appendLog may conflict with next).
     // In cases where both latches are needed at the same time,
     // listLatch is always set first to avoid deadlock. listLatch is
-    // used by appendLogRecord and next to synchronize operations on
+    // used by appendLog and next to synchronize operations on
     // the free and dirty buffer lists and on currentDirtyBuffer.
     // outputLatch is used by next and getXXX to synchronize on the
     // output data variables
@@ -110,42 +103,22 @@
     }
 
     /**
-     * Append a single log record to the log buffer.
+     * Append a chunk of log records to the log buffer.
      *
-     * @param instant               the log address of this log record.
-     * @param dataLength            number of bytes in data[]
-     * @param dataOffset            offset in data[] to start copying from.
-     * @param optionalDataLength    number of bytes in optionalData[]
-     * @param optionalDataOffset    offset in optionalData[] to start copy from
-     * @param data                  "from" array to copy "data" portion of rec
-     * @param optionalData          "from" array to copy "optional data" from
+     * @param greatestInstant   the instant of the log record that was
+     *                          added last to this chunk of log
+     * @param log               the chunk of log records
+     * @param logOffset         offset in log to start copy from
+     * @param logLength         number of bytes to copy, starting
+     *                          from logOffset
      *
      * @throws LogBufferFullException - thrown if there is not enough
-     * free space in the buffer to store the log record.
+     * free space in the buffer to store the chunk of log.
      **/
-    public void appendLogRecord(long instant,
-                                int dataLength,
-                                int dataOffset,
-                                int optionalDataLength,
-                                int optionalDataOffset,
-                                byte[] data,
-                                byte[] optionalData)
+    public void appendLog(long greatestInstant,
+                          byte[] log, int logOffset, int logLength)
         throws LogBufferFullException{
 
-        /* format of log to write:
-         *
-         * (long)   instant
-         * (int)    dataLength
-         * (int)    dataOffset
-         * (int)    optionalDataLength
-         * (int)    optionalDataOffset
-         * (byte[]) data
-         * (byte[]) optionalData
-         */
-
-        int totalLength = dataLength + optionalDataLength +
-                          LOG_RECORD_FIXED_OVERHEAD_SIZE;
-
         synchronized (listLatch) {
             if (currentDirtyBuffer == null) {
                 switchDirtyBuffer();
@@ -155,31 +128,20 @@
 
             // switch buffer if current buffer does not have enough space
             // for the incoming data
-            if (totalLength > currentDirtyBuffer.freeSize()) {
+            if (logLength > currentDirtyBuffer.freeSize()) {
                 switchDirtyBuffer();
             }
 
-            if (totalLength <= currentDirtyBuffer.freeSize()) {
-                currentDirtyBuffer.appendLogRecord(instant,
-                                                   dataLength,
-                                                   dataOffset,
-                                                   optionalDataLength,
-                                                   optionalDataOffset,
-                                                   data,
-                                                   optionalData);
+            if (logLength <= currentDirtyBuffer.freeSize()) {
+                currentDirtyBuffer.appendLog(greatestInstant,
+                                             log, logOffset, logLength);
             } else {
-                // The log record requires more space than one
+                // The chunk of log records requires more space than one
                 // LogBufferElement with default size. Create a new big
                 // enough LogBufferElement
-                LogBufferElement current = new LogBufferElement(totalLength);
+                LogBufferElement current = new LogBufferElement(logLength);
                 current.setRecyclable(false);
-                current.appendLogRecord(instant,
-                                        dataLength,
-                                        dataOffset,
-                                        optionalDataLength,
-                                        optionalDataOffset,
-                                        data,
-                                        optionalData);
+                current.appendLog(greatestInstant, log, logOffset, logLength);
                 dirtyBuffers.addLast(current);
                 // currentDirtyBuffer has already been handed over to
                 // the dirtyBuffers list, and an empty one is in
@@ -242,7 +204,7 @@
                     outBufferLastInstant = current.getLastInstant();
 
                     // recycle = false if the LogBufferElement has been
-                    // used to store a single very big log record
+                    // used to store a very big chunk of log records
                     if (current.isRecyclable()) {
                         freeBuffers.addLast(current);
                     }
@@ -280,7 +242,7 @@
     }
 
     /**
-     * Method to determine whether or not the buffer had log record
+     * Method to determine whether or not the buffer had any log records
      * the last time next() was called.
      *
      * @return true if the buffer contained log records the last time

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java?rev=585900&r1=585899&r2=585900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java Thu Oct 18 01:49:07 2007
@@ -32,6 +32,7 @@
 import org.apache.derby.iapi.store.raw.data.DataFactory;
 
 import org.apache.derby.iapi.services.replication.master.MasterFactory;
+import org.apache.derby.impl.services.replication.net.ReplicationMessageTransmit;
 import org.apache.derby.impl.services.replication.buffer.ReplicationLogBuffer;
 import org.apache.derby.impl.services.replication.buffer.LogBufferFullException;
 
@@ -62,7 +63,7 @@
     private ReplicationLogBuffer logBuffer;
     // waiting for code to go into trunk:
     //    private LogShipper logShipper; 
-    //    private NetworkTransmit connection; 
+    private ReplicationMessageTransmit transmitter; 
 
     private String replicationMode;
     private String slavehost;
@@ -106,12 +107,21 @@
             slaveport = new Integer(port).intValue();
         }
 
-        // Added when Network Service has been committed to trunk
-        // connection = new NetworkTransmit();
-
         System.out.println("MasterController booted");
     }
 
+    private boolean setupConnection(){
+        try {
+            transmitter = new ReplicationMessageTransmit(slavehost, slaveport);
+            transmitter.initConnection();
+            return true;
+        } catch (Exception e) {
+            // printline used for debugging - will be removed
+            System.out.println("(MC) Got an exception during setupConnection:");
+            return false;
+        }
+    }
+
     /**
      * Will stop the replication master service
      *
@@ -174,8 +184,9 @@
         dataFactory = dataFac;
         logFactory = logFac;
         logBuffer = new ReplicationLogBuffer(DEFAULT_LOG_BUFFER_SIZE);
-        //  logFactory.setReplicationMaster(this); //added later
 
+        // May want to move this below connectblock later when
+        // database is not filesystem copied to slave. 
         logFactory.startReplicationMasterRole(this);
 
         if (replicationMode.equals(MasterFactory.ASYNCHRONOUS_MODE)) {
@@ -211,28 +222,20 @@
     }
 
     /**
-     * Append a single log record to the replication log buffer.
-     *
-     * @param dataLength            number of bytes in data[]
-     * @param instant               the log address of this log record.
-     * @param data                  "from" array to copy "data" portion of rec
-     * @param dataOffset            offset in data[] to start copying from.
-     * @param optionalData          "from" array to copy "optional data" from
-     * @param optionalDataOffset    offset in optionalData[] to start copy from
-     * @param optionalDataLength    number of bytes in optionalData[]
+     * Append a chunk of log records to the log buffer.
      *
+     * @param greatestInstant   the instant of the log record that was
+     *                          added last to this chunk of log
+     * @param log               the chunk of log records
+     * @param logOffset         offset in log to start copy from
+     * @param logLength         number of bytes to copy, starting
+     *                          from logOffset
      **/
-    public void appendLogRecord(int dataLength,
-                                long instant,
-                                byte[] data,
-                                int dataOffset,
-                                byte[] optionalData, 
-                                int optionalDataOffset,
-                                int optionalDataLength) {
+    public void appendLog(long greatestInstant,
+                          byte[] log, int logOffset, int logLength){
+
         try {
-            logBuffer.appendLogRecord(instant, dataLength, dataOffset,
-                                      optionalDataLength, optionalDataOffset,
-                                      data, optionalData);
+            logBuffer.appendLog(greatestInstant, log, logOffset, logLength);
         } catch (LogBufferFullException lbfe) {
             // Waiting for log shipper to implement this
             // We have multiple alternatives: 

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/ReplicationLogScan.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/ReplicationLogScan.java?rev=585900&r1=585899&r2=585900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/ReplicationLogScan.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/ReplicationLogScan.java Thu Oct 18 01:49:07 2007
@@ -41,8 +41,11 @@
  * is initialized by calling init(...). Every time next() is called
  * after that, ReplicationLogScan reads a new log record from
  * logToScan. If next() returns true, the next log record was
- * successfully read. The information in this last read log record can
- * be retrieved by using the get-methods.
+ * successfully read. The information in this last read log record
+ * either indicates that a log file switch has taken place on the
+ * master (isLogSwitch() = true) or it is a normal log record which
+ * information can be retrieved by using the get-methods (if
+ * isLogRecord() = true).
  * </p>
  * <p>
  * Threads: The class does not provide thread synchronization since it
@@ -53,13 +56,9 @@
  * </p>
  * <p>
  * The format of the log chunk byte[] is defined in 
- * org.apache.derby.impl.services.replication.buffer.LogBufferElement
- * </p>
- * <p>
- * @see
- * org.apache.derby.impl.services.replication.buffer.LogBufferElement
- * org.apache.derby.impl.services.replication.buffer.LogBufferElement
+ * org.apache.derby.impl.store.raw.log.LogAccessFile
  * </p>
+ * @see org.apache.derby.impl.store.raw.log.LogAccessFile
  */
 
 class ReplicationLogScan {
@@ -71,13 +70,23 @@
     // get-methods to retrieve these
     private long currentInstant;
     private int currentDataOffset;
-    private int currentOptDataOffset;
     private byte[] currentData;
-    private byte[] currentOptData;
-    // validLogRecord = true when the above variables contain
-    // meaningful data. false before next() is called for the first
-    // time and after next() has reached the end of logToScan
-    private boolean validLogRecord;
+
+    /** hasInfo = true when the scan will return meaningful
+     * information, either in the form of a log record (in which case
+     * the above variables will be set), or when it has found a log
+     * record indicating that a log file switch has taken place on the
+     * master (isLogSwitch = true). hasInfo = false before next() is
+     * called for the first time, after next() has reached the end of
+     * logToScan and if an error occured when parsing logToScan (i.e.
+     * when next() has thrown a StandardException)
+     */
+    private boolean hasInfo;
+
+    /** true if the last read log record indicates a log switch, false
+     * if it is a normal log record private boolean isLogSwitch;
+     */
+    private boolean isLogSwitch;
 
     protected ReplicationLogScan() { }
 
@@ -92,11 +101,9 @@
 
         currentPosition = 0;
         currentInstant = -1;
-        currentDataOffset = -1;
-        currentOptDataOffset = -1;
         currentData = null;
-        currentOptData = null;
-        validLogRecord = false;
+        isLogSwitch = false;
+        hasInfo = false;
     }
 
     /**
@@ -107,32 +114,38 @@
      * <p>
      * Side effects: <br>
      * <br>
-     * On a successful read (return true): setting currentInstant,
-     * currentDataOffset, currentOptDataOffset, currentData,
-     * currentOptData, validLogRecord = true. Also updates
-     * currentPosition to point to the byte immediatly following the
-     * log record.
+     * On a successful read (return true): either...<br>
+     *<br>
+     * ... the scan read a log record indicating that a log file
+     * switch has taken place on the master, in which case
+     * isLogFileSwitch() returns true. In this case, getXXX will not
+     * contain valid data. Asserts handle calls to these methods when
+     * in sane mode. currentPosition is updated to point to the byte
+     * immediately following this log file switch log record.<br>
+     *<br>
+     * ... or the scan read a normal log record, in which case
+     * isLogRecord() returns true. Also sets currentInstant and
+     * currentData, and updates currentPosition to point to the byte
+     * immediatly following the log record. In this case, getXXX will
+     * return meaningful information about the log record.
      * </p>
      * <p>
      * If there are no more log records in logToScan (returns false) or
      * a problem occurs (throws StandardException): setting
-     * validLogRecord = false
+     * hasInfo = false
      * </p>
      * @return true if a log record was successfully read, false if end
      * of logToScan has been reached.
-     * @throws StandardException if logToScan is found to be corrupt.
+     * @throws StandardException if logToScan is found to be corrupted.
      */
     protected boolean next() throws StandardException {
 
         /* format of received log:
          *
+         * (int)    total_length (data[].length + optionaldata[].length)
          * (long)   instant
-         * (int)    dataLength
-         * (int)    dataOffset
-         * (int)    optionalDataLength
-         * (int)    optionalDataOffset
-         * (byte[]) data
-         * (byte[]) optionalData
+         * (byte[]) data+optionaldata
+         * (int)    total_length
          */
 
         if (SanityManager.DEBUG){
@@ -149,181 +162,180 @@
             // not be possible for currentPosition to be greater than
             // logToScan.length if not an exception was thrown by the
             // previous next() call
-            validLogRecord = false;
-            return validLogRecord;
+            hasInfo = false;
+            return hasInfo;
         }
 
         try {
-            currentInstant = retrieveLong();       // (long) instant
-            int currentDataLength = retrieveInt(); // (int)  dataLength
-            currentDataOffset = retrieveInt();     // (int)  dataOffset
-                                                   // (int)  optionalDataLength
-            int currentOptDataLength = retrieveInt();
-            currentOptDataOffset = retrieveInt();  // (int)  optionalDataOffset
-
-            // (byte[]) data
-            currentData = new byte[currentDataLength];
-            retrieveBytes(currentData, currentDataLength);
-
-            // (byte[]) optionalData
-            currentOptData = new byte[currentOptDataLength];
-            retrieveBytes(currentOptData, currentOptDataLength);
+            int currentLength = retrieveInt();   // (int)  dataLength
 
-            validLogRecord = true;
+            if (currentLength == 0) { 
+                // int value 0 is written to log to mark EOF. A length
+                // of 0 therefore means that a log file switch has
+                // taken place on the master
+                isLogSwitch = true;
+                hasInfo = true;
+            } else {
+
+                currentInstant = retrieveLong(); // (long) instant
+
+                // (byte[]) data
+                currentData = new byte[currentLength];
+                retrieveBytes(currentData, currentLength);
+
+                retrieveInt();                   // (int) trailing length
+
+                isLogSwitch = false;
+                hasInfo = true;
+            }
         } catch(StandardException se){
             // Means that we have tried to read higher byte addresses
             // than the size of logToScan. That should not happen as
-            // long as logToScan is not currupt. E.g., this could mean
-            // that one of the data or optional data lengths were
+            // long as logToScan is not corrupted. E.g., this could mean
+            // that the data length was
             // wrong. No matter what caused us to be outside the
             // logToScan size, we will not be able to read more log
             // from this logToScan, and should probably abort the
-            // whole replication due to corrupt data. That decision is
+            // whole replication due to corrupted data. That decision is
             // not made here, however.
-            validLogRecord = false;
+            hasInfo = false;
             throw se;
         }
 
-        return validLogRecord;
+        return hasInfo;
     }
 
     /**
      * @return The instant of the log record read by the last call to
-     * next().
+     * next(). Only returns meaningful information if isLogRecord()
+     * returns true.
      * @throws NoSuchElementException if next() has not been called or
      * if there are no more log records in this chunk of log. Should
      * never be thrown unless ReplicationLogScan is used in a wrong
      * way.
      */
     protected long getInstant() throws NoSuchElementException{
-        if (validLogRecord) {
-            return currentInstant;
-        } else {
+        if (!hasInfo) {
             throw new NoSuchElementException();
         }
+
+        if (isLogSwitch) {
+            if (SanityManager.DEBUG){
+                SanityManager.THROWASSERT("Log switch log records " +
+                                          "have no instant");
+            }
+            return -1;
+        }
+
+        return currentInstant;
     }
 
     /**
      * @return The number of bytes in the byte[] returned by getData()
-     * for the log record read by the last call to next().
+     * for the log record read by the last call to next(). Only
+     * returns meaningful information if isLogRecord() returns true.
      * @throws NoSuchElementException if next() has not been called or
      * if there are no more log records in this chunk of log. Should
      * never be thrown unless ReplicationLogScan is used in a wrong
      * way.
      */
     protected int getDataLength() throws NoSuchElementException{
-        if (validLogRecord) {
-            return currentData.length;
-        } else {
+        if (!hasInfo) {
             throw new NoSuchElementException();
         }
-    }
 
-    /**
-     * @return The offset in the byte[] returned by getData() for the
-     * log record read by the last call to next().
-     * @throws NoSuchElementException if next() has not been called or
-     * if there are no more log records in this chunk of log. Should
-     * never be thrown unless ReplicationLogScan is used in a wrong
-     * way.
-     */
-    protected int getDataOffset() throws NoSuchElementException{
-        if (validLogRecord) {
-            return currentDataOffset;
-        } else {
-            throw new NoSuchElementException();
+        if (isLogSwitch) {
+            if (SanityManager.DEBUG){
+                SanityManager.THROWASSERT("Log switch log records " +
+                                          "have no length");
+            }
+            return -1;
         }
+
+        return currentData.length;
     }
 
     /**
-     * @return The number of bytes in the byte[] returned by
-     * getOptData() for the log record read by the last call to next().
+     * Method to get the data byte[] read by the last call to next().
+     * Note that this byte[] contains both byte[] data and byte[]
+     * optional_data from LogAccessFile. To split this byte[] into
+     * data and optional_data, we would need to create a Loggable
+     * object from it because the log does not provide information on
+     * where to split. There is no need to split since this byte[]
+     * will only be written to the slave log anyway. If it was split,
+     * LogAccessFile would simply merge them when writing to file.
+     *
+     * @return The byte[] containing data+optional_data of the log
+     * record read by the last call to next(). Only returns meaningful
+     * information if isLogRecord() returns true.
      * @throws NoSuchElementException if next() has not been called or
      * if there are no more log records in this chunk of log. Should
      * never be thrown unless ReplicationLogScan is used in a wrong
      * way.
      */
-    protected int getOptDataLength() throws NoSuchElementException{
-        if (validLogRecord) {
-            return currentOptData.length;
-        } else {
+    protected byte[] getData() throws NoSuchElementException{
+        if (!hasInfo) {
             throw new NoSuchElementException();
         }
-    }
 
-    /**
-     * @return The offset in the byte[] returned by getOptData() for
-     * the log record read by the last call to next().
-     * @throws NoSuchElementException if next() has not been called or
-     * if there are no more log records in this chunk of log. Should
-     * never be thrown unless ReplicationLogScan is used in a wrong
-     * way.
-     */
-    protected int getOptDataOffset() throws NoSuchElementException{
-        if (validLogRecord) {
-            return currentOptDataOffset;
-        } else {
-            throw new NoSuchElementException();
+        if (isLogSwitch) {
+            if (SanityManager.DEBUG){
+                SanityManager.THROWASSERT("Log switch log records " +
+                                          "have no data");
+            }
+            return null;
         }
+
+        return currentData;
     }
 
     /**
-     * @return The data byte[] of the log record read by the last call
-     * to next().
-     * @throws NoSuchElementException if next() has not been called or
-     * if there are no more log records in this chunk of log. Should
-     * never be thrown unless ReplicationLogScan is used in a wrong
-     * way.
+     * Used to determine whether or not the last call to next() was
+     * successful.
+     * @return true if next() has been called and the end of the log
+     * chunk has not yet been reached. Returns the same answer as the
+     * last call to next() did. Use isLogFileSwitch() and
+     * isLogRecord() to find out if the current log record indicates a
+     * log file switch or is a normal log record.
      */
-    protected byte[] getData() throws NoSuchElementException{
-        if (validLogRecord) {
-            return currentData;
-        } else {
-            throw new NoSuchElementException();
-        }
+    protected boolean hasValidInformation() {
+        return hasInfo;
     }
 
     /**
-     * @return The optionalData byte[] of the log record read by the
-     * last call to next().
+     * Used to determine whether the last call to next() read a log
+     * record
+     * @return true if the last call to next() found a normal log
+     * record.
      * @throws NoSuchElementException if next() has not been called or
      * if there are no more log records in this chunk of log. Should
      * never be thrown unless ReplicationLogScan is used in a wrong
      * way.
      */
-    protected byte[] getOptData() throws NoSuchElementException{
-        if (validLogRecord) {
-            return currentOptData;
-        } else {
+    protected boolean isLogRecord()  throws NoSuchElementException{
+        if (!hasInfo) {
             throw new NoSuchElementException();
         }
+
+        return !isLogSwitch;
     }
 
     /**
-     * @return Length of byte[] data + byte[] optionalData of the log
-     * record read by the last call to next(). This is the same number
-     * as is stored in the normal Derby transaction log as "lenght" in
-     * the head and tail of each log record.
+     * Used to determine whether the last call to next() found a log
+     * file switch
+     * @return true if the last call to next() found a log record
+     * indicating a log file switch has taken place on the master.
      * @throws NoSuchElementException if next() has not been called or
      * if there are no more log records in this chunk of log. Should
      * never be thrown unless ReplicationLogScan is used in a wrong
      * way.
      */
-    protected int getTotalDataLength() throws NoSuchElementException{
-        if (validLogRecord) {
-            return currentData.length + currentOptData.length;
-        } else {
+    protected boolean isLogFileSwitch() throws NoSuchElementException{
+        if (!hasInfo) {
             throw new NoSuchElementException();
         }
-    }
 
-    /**
-     * @return true if next() has been called and the end of the log
-     * chunk has not yet been reached. Returns the same answer as the
-     * last call to next() did.
-     */
-    protected boolean hasValidLogRecord() {
-        return validLogRecord;
+        return isLogSwitch;
     }
 
     /*

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFile.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFile.java?rev=585900&r1=585899&r2=585900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFile.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFile.java Thu Oct 18 01:49:07 2007
@@ -36,6 +36,7 @@
 
 import org.apache.derby.iapi.services.io.FormatIdOutputStream;
 import org.apache.derby.iapi.services.io.ArrayOutputStream;
+import org.apache.derby.iapi.services.replication.master.MasterFactory;
 import org.apache.derby.iapi.store.raw.RawStoreFactory;
 
 
@@ -107,6 +108,9 @@
 	static int                      mon_numWritesToLog;
 	static int                      mon_numBytesToLog;
 
+    // the MasterFactory that will accept log when in replication master mode
+    MasterFactory masterFac; 
+    boolean inReplicationMasterMode = false;
 
 	//streams used to generated check sume log record ; see if there is any simpler way
 	private ArrayOutputStream logOutputBuffer;
@@ -202,6 +206,7 @@
 		}
 		
 		currentBuffer.init(checksumLogRecordSize);
+		logFactory.checkForReplication(this);
 	}
 
 
@@ -263,6 +268,7 @@
                                                  optional_data_length);
             currentBuffer.position = newpos;
             currentBuffer.bytes_free -= total_log_record_length;
+            currentBuffer.greatest_instant = instant;
             if (SanityManager.DEBUG) {
                 int normalizedPosition = currentBuffer.position;
                 if (writeChecksum) {
@@ -326,7 +332,7 @@
             // following direct log to file call finishes.
 
 			// write the log record directly to the log file.
-            writeToLog(bigbuffer, 0, bigBufferLength);
+            writeToLog(bigbuffer, 0, bigBufferLength, instant);
         }
     }
 
@@ -346,7 +352,7 @@
      * @param optional_data_offset offset in "optional_data" to start copy from
      * @param optional_data_length length of optional data to copy.
      *
-     * @see #writeLogRecord
+     * @see LogAccessFile#writeLogRecord
      */
     private int appendLogRecordToBuffer(byte[] buff, int pos,
                                         int length,
@@ -517,8 +523,9 @@
 			
 			while(nFlushed < noOfBuffers)
 			{
-				if (buf.position != 0)
-					writeToLog(buf.buffer, 0, buf.position);
+				if (buf.position != 0) {
+					writeToLog(buf.buffer, 0, buf.position, buf.greatest_instant);
+				}
 
 				nFlushed++;
 				synchronized(this)
@@ -704,9 +711,29 @@
 		}
 	}
 
+    /**
+     * Make this LogAccessFile pass chunks of log records (byte[]) to
+     * the MasterFactory when the chunks are written to disk.
+     * @param masterFac The MasterFactory service responsible for
+     * controlling the master side replication behaviour.
+     */
+    protected void setReplicationMasterRole(MasterFactory masterFac) {
+        this.masterFac = masterFac;
+        inReplicationMasterMode = true;
+    }
+
+    /**
+     * Stop this LogAccessFile from passing chunks of log records to
+     * the MasterFactory.
+     */
+    protected void stopReplicationMasterRole() {
+        inReplicationMasterMode = false;
+        masterFac = null;
+    }
 
 	/* write to the log file */
-	private void writeToLog(byte b[], int off, int len) throws IOException
+	private void writeToLog(byte b[], int off, int len, long highestInstant)
+		throws IOException
 	{
 		synchronized(logFileSemaphore)
 		{
@@ -724,6 +751,10 @@
                     try 
                     {
                         log.write(b, off, len);
+                        if (inReplicationMasterMode) {
+                            masterFac.appendLog(highestInstant,
+                                                b, off, len);
+                        }
                         break;
                     }
                     catch (IOException ioe)
@@ -880,7 +911,7 @@
 		int    p    = 0; //end is written in the beginning of the buffer, no
 						 //need to checksum a int write.
 		p = writeInt(marker , b , p);
-		writeToLog(b, 0, p);
+		writeToLog(b, 0, p, -1); //end marker has no instant
 	}
 
 	

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFileBuffer.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFileBuffer.java?rev=585900&r1=585899&r2=585900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFileBuffer.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogAccessFileBuffer.java Thu Oct 18 01:49:07 2007
@@ -40,6 +40,7 @@
     protected int       bytes_free;
     protected int       position;
 	protected int       length;
+    protected long      greatest_instant;
 
     LogAccessFileBuffer next;
     LogAccessFileBuffer prev;
@@ -67,6 +68,7 @@
 		length =  buffer.length - reserve;
         bytes_free  = length;
         position    = reserve;
+        greatest_instant = -1;
     }
 
     /**************************************************************************

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java?rev=585900&r1=585899&r2=585900&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java Thu Oct 18 01:49:07 2007
@@ -3564,22 +3564,6 @@
                     length, instant, data, offset, 
                     optionalData, optionalDataOffset, optionalDataLength);
 
-                if (inReplicationMasterMode) {
-                    // Append this log record to the replication log
-                    // buffer so that it can be shipped to the slave.
-                    // Note that the length field is not the same as
-                    // used by writeLogRecord which uses the length of
-                    // data+optionalData - masterFactory needs the
-                    // length of data alone.
-                    masterFactory.appendLogRecord(length - optionalDataLength,
-                                                  instant,
-                                                  data, 
-                                                  offset,
-                                                  optionalData,
-                                                  optionalDataOffset,
-                                                  optionalDataLength);
-                }
-
 				if (optionalDataLength != 0) 
                 {
 					if (SanityManager.DEBUG)
@@ -4893,7 +4877,21 @@
     public void startReplicationMasterRole(MasterFactory masterFactory) 
         throws StandardException {
         this.masterFactory = masterFactory;
-        inReplicationMasterMode = true;
+        synchronized(this) {
+            // checkpoint followed by flushAll ensures that all data
+            // and log are written on disk. After this, the database
+            // can be safely copied to the slave location provided
+            // that no clients perform operations on the database
+            // before a connection has been established with the
+            // slave. Note: this is a hack that will be removed once
+            // the repliation functionality is able to send the
+            // database from master to slave using the network
+            // connection.
+            rawStoreFactory.checkpoint();
+            flushAll();
+            inReplicationMasterMode = true;
+            logOut.setReplicationMasterRole(masterFactory);
+        }
     }
 
     /**
@@ -4904,6 +4902,20 @@
     public void stopReplicationMasterRole() {
         inReplicationMasterMode = false;
         masterFactory = null;
+        logOut.stopReplicationMasterRole();
+    }
+
+    /**
+     * Used by LogAccessFile to check if it should take the
+     * replication master role, and thereby send log records to the
+     * MasterFactory.
+     * @param log The LogAccessFile that will take the replication
+     * master role iff this database is master.
+     */
+    protected void checkForReplication(LogAccessFile log) {
+        if (inReplicationMasterMode) {
+            log.setReplicationMasterRole(masterFactory);
+        }
     }
 
 	/**