You are viewing a plain text version of this content. The canonical link for it is here.
Posted to derby-commits@db.apache.org by oy...@apache.org on 2008/02/22 14:54:11 UTC

svn commit: r630207 - in /db/derby/code/trunk/java: engine/org/apache/derby/iapi/error/ engine/org/apache/derby/iapi/store/raw/log/ engine/org/apache/derby/impl/services/replication/buffer/ engine/org/apache/derby/impl/services/replication/master/ engi...

Author: oysteing
Date: Fri Feb 22 05:54:08 2008
New Revision: 630207

URL: http://svn.apache.org/viewvc?rev=630207&view=rev
Log:
DERBY-3382: Slave must inform master if DBs are out of sync.
Adds a check of the log files to the replication initialization so that replication does not start if the log files are out of synch. The master will be notified whether or not the log files are synched.
Tidies up if starting of replication fails.
Contributed by Jorgen Loland

Modified:
    db/derby/code/trunk/java/engine/org/apache/derby/iapi/error/StandardException.java
    db/derby/code/trunk/java/engine/org/apache/derby/iapi/store/raw/log/LogFactory.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/ReadOnly.java
    db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
    db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java

Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/error/StandardException.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/error/StandardException.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/error/StandardException.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/error/StandardException.java Fri Feb 22 05:54:08 2008
@@ -289,6 +289,12 @@
 		Object[] oa = new Object[] {a1};
 		return new StandardException(messageID, oa);
 	}
+
+	public static StandardException newException(String messageID,
+												 Object[] a1) {
+		return new StandardException(messageID, a1);
+	}
+
 	public static StandardException newException(String messageID, Throwable t, Object a1) {
 		Object[] oa = new Object[] {a1};
 		return new StandardException(messageID, t, oa);

Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/store/raw/log/LogFactory.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/store/raw/log/LogFactory.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/store/raw/log/LogFactory.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/store/raw/log/LogFactory.java Fri Feb 22 05:54:08 2008
@@ -172,6 +172,16 @@
 	  */
     LogInstant getFirstUnflushedInstant();
 
+    /**
+     * Get the log instant long value of the first log record that has not 
+     * been flushed. Only works after recover() has finished, or (if in slave 
+     * replication mode) after calling initializeReplicationSlaveRole.
+     *
+     * @return the log instant long value of the first log record that has not 
+     * been flushed
+     */
+    public long getFirstUnflushedInstantAsLong();
+
 	/**
 		Backup restore support
 	 */

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/buffer/ReplicationLogBuffer.java Fri Feb 22 05:54:08 2008
@@ -290,7 +290,8 @@
     /**
      * Can be used so that only the necessary log records are sent
      * when a flush(LogInstant flush_to_this) is called in the log
-     * factory.
+     * factory. Returns the highest log instant in the chunk of log that can 
+     * be read with getData().
      *
      * @return The highest log instant in the chunk of log returned by
      * getData().

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java Fri Feb 22 05:54:08 2008
@@ -93,6 +93,10 @@
      * so that it can be re-shipped to the slave.
      */
     private ReplicationMessage failedChunk = null;
+    /** The highest log instant in failedChunk  */
+    private long failedChunkHighestInstant = -1;
+    /** The highest log instant shipped so far  */
+    private long highestShippedInstant = -1;
     
     /**
      * Fill information value indicative of a low load in the log buffer.
@@ -206,6 +210,7 @@
             //log buffer.
             if (failedChunk != null) {
                 transmitter.sendMessage(failedChunk);
+                highestShippedInstant = failedChunkHighestInstant;
                 failedChunk = null;
             }
             //transmit the log record that is at the head of
@@ -217,6 +222,7 @@
                     ReplicationMessage.TYPE_LOG, logRecords);
                 
                 transmitter.sendMessage(mesg);
+                highestShippedInstant = logBuffer.getLastInstant();
                 lastShippingTime = System.currentTimeMillis();
                 return true;
             } 
@@ -229,7 +235,10 @@
         } catch (IOException ioe) {
             //An exception occurred while transmitting the log record.
             //Store the previous log record so that it can be re-transmitted
-            failedChunk = (mesg==null) ? failedChunk : mesg;
+            if (mesg != null) {
+                failedChunk = mesg;
+                failedChunkHighestInstant = logBuffer.getLastInstant();
+            }
             throw ioe;
         }
         return false;
@@ -273,6 +282,14 @@
         }
     }
     
+    /**
+     * Get the highest log instant shipped so far
+     * @return the highest log instant shipped so far
+     */
+    public long getHighestShippedInstant() {
+        return highestShippedInstant;
+    }
+
     /**
      * updates the information about the latest instance of the log record
      * that has been flushed to the disk. Calling this method has no effect

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java Fri Feb 22 05:54:08 2008
@@ -183,17 +183,27 @@
         logFactory = logFac;
         logBuffer = new ReplicationLogBuffer(DEFAULT_LOG_BUFFER_SIZE, this);
 
-        logFactory.startReplicationMasterRole(this);
+        try {
+            logFactory.startReplicationMasterRole(this);
         
-        rawStoreFactory.unfreeze();
+            rawStoreFactory.unfreeze();
 
-        setupConnection();
+            setupConnection();
 
-        if (replicationMode.equals(MasterFactory.ASYNCHRONOUS_MODE)) {
-            logShipper = new AsynchronousLogShipper(logBuffer,
-                                                    transmitter,
-                                                    this);
-            ((Thread)logShipper).start();
+            if (replicationMode.equals(MasterFactory.ASYNCHRONOUS_MODE)) {
+                logShipper = new AsynchronousLogShipper(logBuffer,
+                                                        transmitter,
+                                                        this);
+                ((Thread)logShipper).start();
+            }
+        } catch (StandardException se) {
+            // cleanup everything that may have been started before
+            // the exception was thrown
+            ReplicationLogger.logError(MessageId.REPLICATION_FATAL_ERROR, null,
+                                       dbname);
+            logFactory.stopReplicationMasterRole();
+            teardownNetwork();
+            throw se;
         }
 
         // Add code that initializes replication by sending the
@@ -212,13 +222,7 @@
         logFactory.stopReplicationMasterRole();
         try {
             logShipper.flushBuffer();
-            
-            logShipper.stopLogShipment();
-
-            ReplicationMessage mesg = new ReplicationMessage(
-                        ReplicationMessage.TYPE_STOP, null);
-
-            transmitter.sendMessage(mesg);
+            teardownNetwork();
         } catch (IOException ioe) {
             ReplicationLogger.
                 logError(MessageId.REPLICATION_LOGSHIPPER_EXCEPTION,
@@ -368,7 +372,22 @@
     private void setupConnection() throws StandardException {
         try {
             transmitter = new ReplicationMessageTransmit(slavehost, slaveport);
-            transmitter.initConnection(SLAVE_CONNECTION_ATTEMPT_TIMEOUT);
+            // getHighestShippedInstant is -1 until the first log
+            // chunk has been shipped to the slave. If a log chunk has
+            // been shipped, use the instant of the latest shipped log
+            // record to synchronize log files. If no log has been
+            // shipped yet, use the end position of the log (i.e.,
+            // logToFile.getFirstUnflushedInstantAsLong). 
+            if (logShipper != null && 
+                logShipper.getHighestShippedInstant() != -1) {
+                transmitter.initConnection(SLAVE_CONNECTION_ATTEMPT_TIMEOUT,
+                                           logShipper.
+                                           getHighestShippedInstant());
+            } else {
+                transmitter.initConnection(SLAVE_CONNECTION_ATTEMPT_TIMEOUT,
+                                           logFactory.
+                                           getFirstUnflushedInstantAsLong());
+            }
         } catch (SocketTimeoutException ste) {
             throw StandardException.newException
                     (SQLState.REPLICATION_MASTER_TIMED_OUT, dbname);
@@ -376,6 +395,8 @@
             throw StandardException.newException
                     (SQLState.REPLICATION_CONNECTION_EXCEPTION, ioe, 
                      dbname, slavehost, String.valueOf(slaveport));
+        } catch (StandardException se) {
+            throw se;
         } catch (Exception e) {
             throw StandardException.newException
                     (SQLState.REPLICATION_CONNECTION_EXCEPTION, e,
@@ -401,8 +422,21 @@
                 try {
                     transmitter = new ReplicationMessageTransmit
                             (slavehost, slaveport);
-                    transmitter.initConnection
-                            (SLAVE_CONNECTION_ATTEMPT_TIMEOUT);
+
+                    // see comment in setupConnection
+                    if (logShipper != null &&
+                        logShipper.getHighestShippedInstant() != -1) {
+                        transmitter.
+                            initConnection(SLAVE_CONNECTION_ATTEMPT_TIMEOUT,
+                                           logShipper.
+                                           getHighestShippedInstant());
+                    } else {
+                        transmitter.
+                            initConnection(SLAVE_CONNECTION_ATTEMPT_TIMEOUT,
+                                           logFactory.
+                                           getFirstUnflushedInstantAsLong());
+                    }
+
                     break;
                 } catch (SocketTimeoutException ste) {
                     continue;
@@ -435,4 +469,27 @@
     public void workToDo() {
         logShipper.workToDo();
     }
+
+    /**
+     * Stop log shipping, notify slave that replication is stopped and
+     * tear down network connection with slave.
+     */
+    private void teardownNetwork() {
+
+        if (logShipper != null) {
+            logShipper.stopLogShipment();
+        }
+
+        if (transmitter != null) {
+            try {
+                ReplicationMessage mesg =
+                    new ReplicationMessage(ReplicationMessage.TYPE_STOP, null);
+                transmitter.sendMessage(mesg);
+            } catch (IOException ioe) {}
+            try {
+                transmitter.tearDown();
+            } catch (IOException ioe) {}
+        }
+    }
+
 }

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java Fri Feb 22 05:54:08 2008
@@ -31,19 +31,6 @@
  * slave during Replication. The message is composed of a type flag
  * and corresponding object. Each type flag indicating the type of the 
  * content bundled inside the message.
- *
- * For now the following message types are defined
- *
- * TYPE_LOG - This flag will be used for all messages will carry LogRecords.
- * TYPE_ACK - this flag is used to send a acknowledgment of successful
- *            completion of a requested operation. It will however not
- *            be used to signify reception for every message transmission
- *            since tcp would automatically take care of this.
- * TYPE_ERROR - Indicates that the requested operation was not able to be
- *              completed successfully.
- * TYPE_INITIATE - used during the intial handshake between the master and
- *                 the slave. The initial handshake helps to negotiate the
- *                 message UIDs and send a appropriate error or acknowledgment.
  */
 public class ReplicationMessage implements Externalizable {
     /**
@@ -66,12 +53,31 @@
     private int type;
     
     /**
+     * used during the intial handshake between the master and
+     * the slave. The initial handshake helps to negotiate the
+     * message UIDs and send a appropriate error or acknowledgment.
+     * The object this message contains will be a <code>Long</code>.
+     * IMPORTANT: This constant must not be changed in future versions since 
+     * we need it to decide slave/master version mismatch
+     */
+    public static final int TYPE_INITIATE_VERSION = 0;
+
+    /**
+     * Used during the intial handshake between the master and
+     * the slave. Messages of this type are used to ensure that master and 
+     * slave have identical log files by checking that they will insert 
+     * the next log record on the same byte position in the log.
+     * The object this message contains will be a <code>Long</code>.
+     */
+    public static final int TYPE_INITIATE_INSTANT = 1;
+
+    /**
      * This flag will be used for all messages that carry log records.
      * The Object this message type contains will be a <code>byte</code>
      * array. The content of the byte array is the log records in the
      * binary form.
      */
-    public static final int TYPE_LOG = 0;
+    public static final int TYPE_LOG = 10;
     
     /**
      * This flag is used to send an acknowledgment of successful completion
@@ -80,34 +86,34 @@
      * type contains will be a <code>String</code>. The SQLState of the
      * error message can be used here.
      */
-    public static final int TYPE_ACK = 1;
+    public static final int TYPE_ACK = 11;
     
     /**
      * Indicates that the requested operation was not able to be
      * completed successfully. The object this message type contains
-     * will be a <code>String</code>.
-     */
-    public static final int TYPE_ERROR = 2;
-    
-    /**
-     * used during the intial handshake between the master and
-     * the slave. The initial handshake helps to negotiate the
-     * message UIDs and send a appropriate error or acknowledgment.
-     * The object this message contains will be a <code>Long</code>.
+     * will be a <code>String[]</code> where the first length-1 fields
+     * are used as arguments to create the exception and the last
+     * field contains the SQLState. The SQLState is the last element
+     * in the Array, because this means that the whole Array can be
+     * used as input to the StandardException creator. The SQLState
+     * will be ignored by the exception creator because there is one
+     * argument too many. This way we don't have to make a copy of the
+     * received Array, containing all elements except the SQLState
+     * element.
      */
-    public static final int TYPE_INITIATE = 3;
+    public static final int TYPE_ERROR = 12;
     
     /**
      * Used to send a stop replication signal to the slave. Since this
      * is a control signal the object this message contains will be null.
      */
-    public static final int TYPE_STOP = 4;
+    public static final int TYPE_STOP = 20;
     
     /**
      * Used to signal the slave that it must failover. The object associated
      * with this message will be null.
      */
-    public static final int TYPE_FAILOVER = 5;
+    public static final int TYPE_FAILOVER = 21;
     
     /**
      * public No args constructor required with Externalizable.

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java Fri Feb 22 05:54:08 2008
@@ -34,6 +34,7 @@
 import org.apache.derby.iapi.reference.MessageId;
 import org.apache.derby.iapi.reference.SQLState;
 import org.apache.derby.iapi.services.monitor.Monitor;
+import org.apache.derby.impl.store.raw.log.LogCounter;
 
 /**
  * This class is the Receiver (viz. Socket server or listener) part of the
@@ -105,6 +106,13 @@
      * has been established before the timeout, a
      * PrivilegedExceptionAction is raised with cause
      * java.net.SocketTimeoutException
+     * @param synchOnInstant the slave log instant, used to check that
+     * the master and slave log files are in synch. If no chunks of log
+     * records have been received from the master yet, this is the
+     * end position in the current log file. If a chunk of log has been
+     * received, this is the instant of the log record received last.
+     * Note that there is a difference!
+     * @param dbname the name of the replicated database
      *
      * @throws PrivilegedActionException if an exception occurs while trying
      *                                   to open a connection.
@@ -117,7 +125,8 @@
      * @throws StandardException if an incompatible database version is found.
      *
      */
-    public void initConnection(int timeout) throws
+    public void initConnection(int timeout, long synchOnInstant, String dbname)
+        throws
         PrivilegedActionException,
         IOException,
         StandardException,
@@ -143,20 +152,11 @@
         //create the SocketConnection object using the client connection.
         socketConn = new SocketConnection(client);
         
-        //wait for the initiator message on the SocketConnection
-        ReplicationMessage initMesg = readMessage();
-        
-        //Check if this message is an initiator message, if not
-        //throw an exception
-        if (initMesg.getType() != ReplicationMessage.TYPE_INITIATE) {
-            //The message format was not recognized. Hence throw
-            //an unexpected exception.
-            throw StandardException.newException
-                (SQLState.REPLICATION_UNEXPECTED_EXCEPTION);
-        }
-        
-        //parse the initiator message and perform appropriate action
-        parseInitiatorMessage(initMesg);
+        // exchange initiator messages to check that master and slave are at 
+        // the same version...
+        parseAndAckVersion(readMessage(), dbname);
+        // ...and have equal log files
+        parseAndAckInstant(readMessage(), synchOnInstant, dbname);
     }
     
     /**
@@ -204,17 +204,32 @@
      * in the initiator message, with the UID of the same class in the slave.
      *
      * @param initiatorMessage the object containing the UID.
+     * @param dbname the name of the replicated database
      *
      * @throws IOException If an exception occurs while sending the
      *                     acknowledgment.
      *
      * @throws StandardException If the UID's do not match.
      */
-    private void parseInitiatorMessage(ReplicationMessage initiatorMessage)
+    private void parseAndAckVersion(ReplicationMessage initiatorMessage, 
+                                    String dbname)
         throws IOException, StandardException {
         //Holds the replication message that will be sent
         //to the master.
         ReplicationMessage ack = null;
+
+        //Check if this message is an initiate version message, if not
+        //throw an exception
+        if (initiatorMessage.getType() != 
+                ReplicationMessage.TYPE_INITIATE_VERSION) {
+            // The message format was not recognized. Notify master and throw
+            // an exception
+            String expectedMsgId = String.
+                valueOf(ReplicationMessage.TYPE_INITIATE_VERSION);
+            String receivedMsgId = String.valueOf(initiatorMessage.getType());
+            handleUnexpectedMessage(dbname, expectedMsgId, receivedMsgId);
+        }
+
         //Get the UID of the master
         long masterVersion = ((Long)initiatorMessage.getMessage()).longValue();
         //If the UID's are equal send the acknowledgment message
@@ -223,17 +238,119 @@
                 (ReplicationMessage.TYPE_ACK, "UID OK");
             socketConn.writeMessage(ack);
         } else {
-            //If the UID's are not equal send an error message
+            //If the UID's are not equal send an error message. The
+            //object of a TYPE_ERROR message must be a String[]
             ack = new ReplicationMessage
                 (ReplicationMessage.TYPE_ERROR,
-                SQLState.REPLICATION_MASTER_SLAVE_VERSION_MISMATCH);
-            
+                 new String[]{SQLState.
+                              REPLICATION_MASTER_SLAVE_VERSION_MISMATCH});
+            socketConn.writeMessage(ack);
+
             //The UID's do not match.
             throw StandardException.newException
                 (SQLState.REPLICATION_MASTER_SLAVE_VERSION_MISMATCH);
         }
     }
+
+    /**
+     * Used to parse the log instant initiator message from the master and 
+     * check that the master and slave log files are in synch.
+     *
+     * @param initiatorMessage the object containing the UID.
+     * @param synchOnInstant the slave log instant, used to check that
+     * the master and slave log files are in synch. If no chunks of log
+     * records have been received from the master yet, this is the
+     * end position in the current log file. If a chunk of log has been
+     * received, this is the instant of the log record received last.
+     * Note that there is a difference!
+     * @param dbname the name of the replicated database
+     *
+     * @throws IOException If an exception occurs while sending the
+     *                     acknowledgment.
+     *
+     * @throws StandardException If the log files are not in synch
+     */
+    private void parseAndAckInstant(ReplicationMessage initiatorMessage,
+                                    long synchOnInstant, String dbname)
+        throws IOException, StandardException {
+        ReplicationMessage ack = null;
+
+        //Check if this message is a log synch message, if not throw
+        //an exception
+        if (initiatorMessage.getType() !=
+            ReplicationMessage.TYPE_INITIATE_INSTANT) {
+            // The message format was not recognized. Notify master and throw 
+            // an exception
+            String expectedMsgId = String.
+                valueOf(ReplicationMessage.TYPE_INITIATE_INSTANT);
+            String receivedMsgId = String.valueOf(initiatorMessage.getType());
+            handleUnexpectedMessage(dbname, expectedMsgId, receivedMsgId);
+        }
+
+        // Get the log instant of the master
+        long masterInstant = ((Long)initiatorMessage.getMessage()).longValue();
+
+        if (masterInstant == synchOnInstant) {
+            // Notify the master that the logs are in synch
+            ack = new ReplicationMessage
+                (ReplicationMessage.TYPE_ACK, "Instant OK");
+            socketConn.writeMessage(ack);
+        } else {
+            // Notify master that the logs are out of synch
+            // See ReplicationMessage#TYPE_ERROR
+            String[] exception = new String[6];
+            exception[0] = dbname;
+            exception[1] = String.valueOf(LogCounter.
+                                          getLogFileNumber(masterInstant));
+            exception[2] = String.valueOf(LogCounter.
+                                          getLogFilePosition(masterInstant));
+            exception[3] = String.valueOf(LogCounter.
+                                          getLogFileNumber(synchOnInstant));
+            exception[4] = String.valueOf(LogCounter.
+                                          getLogFilePosition(synchOnInstant));
+            exception[5] = SQLState.REPLICATION_LOG_OUT_OF_SYNCH;
+            ack = new ReplicationMessage(ReplicationMessage.TYPE_ERROR, 
+                                         exception);
+            socketConn.writeMessage(ack);
+
+            throw StandardException.
+                newException(SQLState.REPLICATION_LOG_OUT_OF_SYNCH, exception);
+        }
+    }
     
+    /**
+     * Notify other replication peer that the message type was unexpected and 
+     * throw a StandardException
+     *
+     * @param dbname the name of the replicated database
+     * @param expextedMsgId the expected message type
+     * @param receivedMsgId the received message type
+     *
+     * @throws StandardException exception describing that an unexpected
+     * message was received is always thrown 
+     * @throws java.io.IOException thrown if an exception occurs while sending
+     * the error message 
+     */
+    private void handleUnexpectedMessage(String dbname, 
+                                         String expextedMsgId,
+                                         String receivedMsgId)
+        throws StandardException, IOException {
+        String[] exception = new String[4];
+        exception[0] = dbname;
+        exception[1] = expextedMsgId;
+        exception[2] = receivedMsgId;
+        exception[3] = SQLState.REPLICATION_UNEXPECTED_MESSAGEID;
+
+        ReplicationMessage ack = 
+            new ReplicationMessage(ReplicationMessage.TYPE_ERROR, exception);
+
+        socketConn.writeMessage(ack);
+
+        throw StandardException.
+            newException(SQLState.REPLICATION_UNEXPECTED_MESSAGEID, exception);
+
+    }
+
     /**
      * Used to send a replication message to the master.
      *

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java Fri Feb 22 05:54:08 2008
@@ -76,6 +76,12 @@
      * @param timeout the amount of time for which the connection should
      *                block before being established.
      *
+     * @param synchOnInstant the master log instant, used to check
+     * that the master and slave log files are in synch. If no chunks
+     * of log records have been shipped to the slave yet, this is the
+     * end position in the current log file. If a chunk of log has
+     * been shipped, this is the instant of the log record shipped
+     * last. Note that there is a difference!
      * @throws PrivilegedActionException if an exception occurs while trying
      *                                   to open a connection.
      *
@@ -89,7 +95,7 @@
      * @throws ClassNotFoundException Class of a serialized object cannot
      *         be found.
      */
-    public void initConnection(int timeout) throws
+    public void initConnection(int timeout, long synchOnInstant) throws
         PrivilegedActionException,
         IOException,
         StandardException,
@@ -116,10 +122,21 @@
         socketConn = new SocketConnection(s);
         
         //send the initiate message and receive acknowledgment
-        sendInitiatorAndReceiveAck();
+        sendInitiatorAndReceiveAck(synchOnInstant);
     }
     
     /**
+     * Tear down the network connection established with the
+     * other replication peer
+     *
+     * @throws IOException if an exception occurs while trying to tear
+     *                     down the network connection
+     */
+    public void tearDown() throws IOException {
+        socketConn.tearDown();
+    }
+
+    /**
      * Used to send a replication message to the slave.
      *
      * @param message a <code>ReplicationMessage</code> object that contains
@@ -151,45 +168,71 @@
     }
     
     /**
-     * Used to send a initiator message to the slave and receive information
-     * about the compatibility of the slave with the master. The slave 
-     * determines if the software versions are compatible by comparing the
-     * UID's of the <code>ReplicationMessage</code> of the master and the
-     * slave.
+     * Used to send initiator messages to the slave and receive
+     * information about the compatibility of the slave with the
+     * master. One message is used to check that the slave and master
+     * have the same software versions. A second message is used to
+     * check that the master and slave log files are in synch.
+     *
+     * @param synchOnInstant the master log instant, used to check
+     * that the master and slave log files are in synch. If no chunks
+     * of log records have been shipped to the slave yet, this is the
+     * end position in the current log file. If a chunk of log has
+     * been shipped, this is the instant of the log record shipped
+     * last. Note that there is a difference!
      *
      * @throws IOException if an exception occurs during the sending or
      *                     reading of the message.
      *
      * @throws StandardException If an error message is received from the
      *                           server indicating a mis-match in
-     *                           serialVersionUID.
+     *                           serialVersionUID or log files out of synch.
      *
      * @throws ClassNotFoundException Class of a serialized object cannot
      *                                be found.
      */
-    private void sendInitiatorAndReceiveAck() 
+    private void sendInitiatorAndReceiveAck(long synchOnInstant)
         throws IOException, StandardException, ClassNotFoundException {
-        //Build the initiator message with the serialVersionUID of the
-        //ReplicationMessage.
-        ReplicationMessage initiatorMsg = new ReplicationMessage
-            (ReplicationMessage.TYPE_INITIATE, new Long(
-            ReplicationMessage.serialVersionUID));
-        
-        //send the initiator message to the slave.
+        // Check that master and slave have the same serialVersionUID
+        ReplicationMessage initiatorMsg = 
+            new ReplicationMessage(ReplicationMessage.TYPE_INITIATE_VERSION, 
+                                   new Long(ReplicationMessage.
+                                            serialVersionUID));
         sendMessage(initiatorMsg);
-        
-        //read the acknowledgment from the slave.
-        ReplicationMessage ack = readMessage();
-        
-        
+        verifyMessageAck(readMessage());
+
+        // Check that master and slave log files are in synch
+        initiatorMsg =
+            new ReplicationMessage(ReplicationMessage.TYPE_INITIATE_INSTANT,
+                                   new Long(synchOnInstant));
+        sendMessage(initiatorMsg);
+        verifyMessageAck(readMessage());
+    }
+
+    /**
+     * Used to parse a message received from the slave. If the message
+     * is an ack of the last shipped message, this method terminates
+     * quietly. Otherwise, it throws the exception received in the
+     * message from the slave describing why the last message could
+     * not be acked.
+     *
+     * @throws StandardException If an error message is received from
+     *                           the server
+     *
+     * @throws ClassNotFoundException Class of a serialized object cannot
+     *                                be found.
+     */
+    private void verifyMessageAck(ReplicationMessage ack) 
+        throws StandardException {
         //If the message is a TYPE_ACK the slave is capable
         //of handling the messages and is at a compatible database version.
         if (ack.getType() == ReplicationMessage.TYPE_ACK) {
             return;
         } else if (ack.getType() == ReplicationMessage.TYPE_ERROR) {
-            //The UID's do not match.
-            throw StandardException.newException
-                ((String)ack.getMessage());
+            // See ReplicationMessage#TYPE_ERROR
+            String exception[] = (String[])ack.getMessage();
+            throw StandardException.
+                newException(exception[exception.length - 1], exception);
         } else {
             //The message format was not recognized. Hence throw
             //an unexpected exception.

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/slave/SlaveController.java Fri Feb 22 05:54:08 2008
@@ -31,6 +31,7 @@
 import org.apache.derby.iapi.services.monitor.Monitor;
 
 import org.apache.derby.iapi.store.raw.RawStoreFactory;
+import org.apache.derby.impl.store.raw.log.LogCounter;
 import org.apache.derby.iapi.store.raw.log.LogFactory;
 import org.apache.derby.impl.store.raw.log.LogToFile;
 
@@ -81,11 +82,17 @@
     private int slaveport;
     private String dbname; // The name of the replicated database
 
-    // Whether or not replication slave mode is still on. Will be set
-    // to false when slave replication is shut down. The value of this
-    // variable is checked after every timeout when trying to set up a
-    // connection to the master, and by the thread that applies log
-    // chunks received from the master.
+    /** The instant of the latest log record received from the master 
+     * and processed so far. Used to check that master and slave log files 
+     * are in synch */
+    private volatile long highestLogInstant = -1;
+
+    /**
+     * Whether or not replication slave mode is still on. Will be set
+     * to false when slave replication is shut down. The value of this
+     * variable is checked after every timeout when trying to set up a
+     * connection to the master, and by the thread that applies log
+     * chunks received from the master. */
     private volatile boolean inReplicationSlaveMode = true;
 
     /** Whether or not this SlaveController has been successfully
@@ -262,8 +269,7 @@
             ReplicationLogger.logError(null, ioe, dbname);
         }
 
-        logToFile.flushAll();
-        logToFile.stopReplicationSlaveMode();
+        logToFile.stopReplicationSlaveRole();
 
         Monitor.logTextMessage(MessageId.REPLICATION_SLAVE_STOPPED, dbname);
     }
@@ -337,8 +343,24 @@
     private boolean setupConnection() throws StandardException {
 
         try {
-            // timeout to check if still in replication slave mode
-            receiver.initConnection(DEFAULT_SOCKET_TIMEOUT);
+            // highestLogInstant is -1 until the first log chunk has
+            // been received from the master. If a log chunk has been
+            // received, use the instant of the latest received log
+            // record to synchronize log files. If no log has been
+            // received yet, use the end position of the log (i.e.,
+            // logToFile.getFlushedInstant)
+            if (highestLogInstant != -1) {
+                // timeout to check if still in replication slave mode
+                receiver.initConnection(DEFAULT_SOCKET_TIMEOUT,
+                                        highestLogInstant,
+                                        dbname);
+            } else {
+                // timeout to check if still in replication slave mode
+                receiver.initConnection(DEFAULT_SOCKET_TIMEOUT,
+                                        logToFile.
+                                        getFirstUnflushedInstantAsLong(),
+                                        dbname);
+            }
             connectedToMaster = true;
             return true; // will not reach this if timeout
         } catch (StandardException se) {
@@ -536,10 +558,17 @@
                         throw StandardException.newException
                             (SQLState.REPLICATION_LOG_OUT_OF_SYNCH,
                              dbname,
-                             new Long(logScan.getInstant()),
-                             new Long(localInstant));
-
+                             new Long(LogCounter.
+                                      getLogFileNumber(logScan.getInstant())),
+                             new Long(LogCounter.
+                                      getLogFilePosition(logScan.
+                                                         getInstant())),
+                             new Long(LogCounter.
+                                      getLogFileNumber(localInstant)),
+                             new Long(LogCounter.
+                                      getLogFilePosition(localInstant)));
                     }
+                    highestLogInstant = localInstant;
                 }
             }
         }

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/LogToFile.java Fri Feb 22 05:54:08 2008
@@ -4352,6 +4352,12 @@
 		return new LogCounter(logFileNumber,lastFlush);
 	}
 
+    public synchronized long getFirstUnflushedInstantAsLong() {
+        if (SanityManager.DEBUG) {
+            SanityManager.ASSERT(logFileNumber > 0 && lastFlush > 0);
+        }
+        return LogCounter.makeLogInstantAsLong(logFileNumber,lastFlush);
+    }
 
 	/**
 	 * Backup restore - stop sending log record to the log stream
@@ -5085,13 +5091,17 @@
      * recovery process and throw a StandardException with SQLState
      * SHUTDOWN_DATABASE. This should only be done when the database
      * will be shutdown.
+     * @throws StandardException Standard Derby exception policy
      * @see org.apache.derby.impl.db.SlaveDatabase
      */
-    public void stopReplicationSlaveMode() {
+    public void stopReplicationSlaveRole() throws StandardException {
         // Do not set inReplicationSlaveMode=false here because that
         // will let the thread currently doing recover complete the
         // boot process. Setting replicationSlaveException aborts the
         // boot process.
+        if (!stopped) {
+            flushAll();
+        }
         replicationSlaveException =
                 StandardException.newException(
                 SQLState.SHUTDOWN_DATABASE);

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/ReadOnly.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/ReadOnly.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/ReadOnly.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/store/raw/log/ReadOnly.java Fri Feb 22 05:54:08 2008
@@ -227,6 +227,13 @@
 		return null;
 	}
 
+	public long getFirstUnflushedInstantAsLong() {
+		if (SanityManager.DEBUG) {
+			SanityManager.THROWASSERT("functionality not implemented");
+		}
+		return LogCounter.INVALID_LOG_INSTANT;
+	}
+
 	/**
 	  @exception StandardException functionality not implmented
 	  */

Modified: db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml Fri Feb 22 05:54:08 2008
@@ -4760,10 +4760,12 @@
 
             <msg>
                 <name>XRE05</name>
-                <text>The log received from the master is not in synch with the local log for replicated database '{0}'. The received log instant is {1}, whereas the local instant is {2}. This is FATAL for replication - replication will be stopped.</text>
+                <text>The log files on the master and slave are not in synch for replicated database '{0}'. The master log instant is {1}:{2}, whereas the slave log instant is {3}:{4}. This is FATAL for replication - replication will be stopped.</text>
                 <arg>dbname</arg>
-                <arg>masterinstant</arg>
-                <arg>slaveinstant</arg>
+                <arg>masterfile</arg>
+                <arg>masteroffset</arg>
+                <arg>slavefile</arg>
+                <arg>slaveoffset</arg>
             </msg>
             
             <msg>
@@ -4793,6 +4795,14 @@
                 <name>XRE10</name>
                 <text>Conflicting attributes specified. See reference manual for attributes allowed in combination with replication attribute '{0}'.</text>
                 <arg>attribute</arg>
+            </msg>
+            
+            <msg>
+                <name>XRE12</name>
+                <text>Replication network protocol error for database '{0}'. Expected message type '{1}', but received type '{2}'.</text>
+                <arg>dbname</arg>
+                <arg>expectedtype</arg>
+                <arg>receivedtype</arg>
             </msg>
             
             <msg>

Modified: db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java?rev=630207&r1=630206&r2=630207&view=diff
==============================================================================
--- db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java (original)
+++ db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/SQLState.java Fri Feb 22 05:54:08 2008
@@ -1775,6 +1775,7 @@
     String REPLICATION_SLAVE_STARTED_OK                            = "XRE08";
     String CANNOT_START_SLAVE_ALREADY_BOOTED                       = "XRE09";
     String REPLICATION_CONFLICTING_ATTRIBUTES                      = "XRE10";
+    String REPLICATION_UNEXPECTED_MESSAGEID                        = "XRE12";
     String REPLICATION_FAILOVER_SUCCESSFUL                         = "XRE20.D";
     String REPLICATION_FAILOVER_UNSUCCESSFUL                       = "XRE21";
     String REPLICATION_NOT_IN_SLAVE_MODE                           = "XRE40";