You are viewing a plain text version of this content. The canonical link for it is here.
Posted to derby-commits@db.apache.org by oy...@apache.org on 2007/10/03 11:06:38 UTC

svn commit: r581534 - in /db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master: AsynchronousLogShipper.java LogShipper.java MasterController.java

Author: oysteing
Date: Wed Oct  3 02:06:38 2007
New Revision: 581534

URL: http://svn.apache.org/viewvc?rev=581534&view=rev
Log:
DERBY-3064: Implement the LogShipper that will enable the shipping of Log records from the master to the slave
Contributed by Narayanan

Added:
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java   (with props)
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/LogShipper.java   (with props)
Modified:
    db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java

Added: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java?rev=581534&view=auto
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java (added)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java Wed Oct  3 02:06:38 2007
@@ -0,0 +1,214 @@
+/*
+ 
+   Derby - Class org.apache.derby.impl.services.replication.master.AsynchronousLogShipper
+ 
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to you under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ 
+ */
+
+package org.apache.derby.impl.services.replication.master;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.derby.iapi.error.StandardException;
+import org.apache.derby.iapi.reference.SQLState;
+
+import org.apache.derby.impl.services.replication.buffer.ReplicationLogBuffer;
+import org.apache.derby.impl.services.replication.net.ReplicationMessage;
+import org.apache.derby.impl.services.replication.net.ReplicationMessageTransmit;
+
+/**
+ * <p>
+ * Does asynchronous shipping of log records from the master to the slave being
+ * replicated to. The implementation does not ship log records as soon as they
+ * become available in the log buffer (synchronously), instead it does log
+ * shipping in the following two-fold scenarios
+ *
+ * 1) Periodically (i.e.) at regular intervals of time.
+ *
+ * 2) when a request is sent from the master controller (force flushing of
+ *    the log buffer).
+ * </p>
+ * <p>
+ * The interval at which log shipping happens is configurable. The log shipper
+ * accepts the interval at which the shipping should be done during
+ * initialization.
+ *
+ * Periodic shipping and force flushing are time aware of each other (i.e.)
+ * a force flush results in the time of next periodic shipping being calculated
+ * from the force flush time.
+ * </p>
+ */
+public class AsynchronousLogShipper extends Thread implements
+    LogShipper {
+    
+    /**
+     * Replication log buffer that contains the log records that need to
+     * be transmitted to the slave.
+     */
+    final private ReplicationLogBuffer logBuffer;
+    
+    /**
+     * Replication message transmitter that is used for the network
+     * transmission of the log records retrieved from the log buffer
+     * (on the master) to the slave being replicated to.
+     */
+    final private ReplicationMessageTransmit transmitter;
+    
+    /**
+     * Time interval (in milliseconds) at which the log shipping takes place.
+     */
+    private long shippingInterval;
+    
+    /**
+     * Indicates whether a stop shipping request has been sent.
+     * true - stop shipping log records
+     * false - shipping can continue without interruption.
+     */
+    private boolean stopShipping = false;
+    
+    /**
+     * The master controller that initialized this log shipper.
+     */
+    private MasterController masterController = null;
+    
+    /**
+     * Constructor initializes the log buffer, the replication message
+     * transmitter, the shipping interval and the master controller.
+     *
+     * @param logBuffer the replication log buffer that contains the log record
+     *                  chunks to be transmitted to the slave.
+     * @param transmitter the replication message transmitter that is used for
+     *                    network transmission of retrieved log records.
+     * @param shippingInterval a long value that stores the time interval in
+     *                         milliseconds at which the log shipping takes
+     *                         place.
+     * @param masterController The master controller that initialized this log
+     *                         shipper.
+     */
+    public AsynchronousLogShipper(ReplicationLogBuffer logBuffer,
+        ReplicationMessageTransmit transmitter,
+        long shippingInterval,
+        MasterController masterController) {
+        this.logBuffer = logBuffer;
+        this.transmitter = transmitter;
+        this.shippingInterval = shippingInterval;
+        this.masterController = masterController;
+    }
+    
+    /**
+     * Ships log records from the log buffer to the slave being replicated to.
+     * The log shipping happens at the configured shipping intervals unless a
+     * force flush happens, which triggers periodic shipping also since there
+     * will still be more log to send after the forceFlush has sent one chunk.
+     */
+    public void run() {
+        while (!stopShipping) {
+            try {
+                shipALogChunk();
+                //Make the thread wait for shipping interval of time before
+                //the next transmission happens.
+                synchronized(this) {
+                    wait(shippingInterval);
+                }
+            } catch (InterruptedException ie) {
+                //Ignore the Interrupted exception to enable stopping
+                //the shipping thread in a controlled way.
+            } catch (IOException ioe) {
+                masterController.handleExceptions(ioe);
+            } catch (StandardException se) {
+                masterController.handleExceptions(se);
+            }
+        }
+    }
+    
+    /**
+     * Retrieves a chunk of log records, if available, from the log buffer and
+     * transmits them to the slave. Used for both periodic and forced shipping.
+     *
+     * @throws IOException If an exception occurs while trying to ship the
+     *                     replication message (containing the log records)
+     *                     across the network.
+     * @throws StandardException If an exception occurs while trying to read
+     *                           log records from the log buffer.
+     */
+    private synchronized void shipALogChunk()
+    throws IOException, StandardException {
+        byte [] logRecords = null;
+        try {
+            if (logBuffer.next()) {
+                logRecords = logBuffer.getData();
+                
+                ReplicationMessage mesg = new ReplicationMessage(
+                    ReplicationMessage.TYPE_LOG, logRecords);
+                
+                transmitter.sendMessage(mesg);
+            }
+        } catch (NoSuchElementException nse) {
+            //Although next() returns true a request for data on the buffer
+            //fails implying that there has been a fatal exception in the
+            //buffer.
+            masterController.handleExceptions(StandardException.newException
+                (SQLState.REPLICATION_UNEXPECTED_EXCEPTION, nse));
+        }
+    }
+    
+    /**
+     * Transmits a chunk of log record from the log buffer to the slave, used
+     * by the master controller when the log buffer is full and some space
+     * needs to be freed for further log records.
+     *
+     * @throws IOException If an exception occurs while trying to ship the
+     *                     replication message (containing the log records)
+     *                     across the network.
+     * @throws StandardException If an exception occurs while trying to read
+     *                           log records from the log buffer.
+     */
+    public void forceFlush() throws IOException, StandardException {
+        shipALogChunk();
+        
+        synchronized(this) {
+            //There will still be more log to send after the forceFlush
+            //has sent one chunk.  Notify the log shipping thread that
+            //it is time for another send.
+            notify();
+        }
+    }
+    
+    /**
+     * updates the information about the latest instance of the log record
+     * that has been flushed to the disk. Calling this method has no effect
+     * in this asynchronous implementation of the log shipper.
+     *
+     *
+     * @param latestInstanceFlushedToDisk a long that contains the latest
+     *        instance of the log record that has been flushed to the disk.
+     */
+    public void flushedInstance(long latestInstanceFlushedToDisk) {
+        //Currently the Asynchronous log shipper
+        //does not worry about the last instance flushed.
+    }
+    
+    /**
+     * Stop shipping log records. If a ship is currently in progress
+     * it will not be interrupted, shipping will stop only after the
+     * current shippment is done.
+     */
+    public void stopLogShipment() {
+        stopShipping = true;
+    }
+}

Propchange: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/AsynchronousLogShipper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/LogShipper.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/LogShipper.java?rev=581534&view=auto
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/LogShipper.java (added)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/LogShipper.java Wed Oct  3 02:06:38 2007
@@ -0,0 +1,60 @@
+/*
+ 
+   Derby - Class org.apache.derby.impl.services.replication.master.LogShipper
+ 
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to you under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ 
+ */
+
+package org.apache.derby.impl.services.replication.master;
+
+import java.io.IOException;
+
+import org.apache.derby.iapi.error.StandardException;
+
+/**
+ *
+ * This is the interface for the replication log shipper. The log shipper
+ * is started by the master controller service. The log shipper is responsible
+ * for shipping of the log chunks from the log buffer (on the master) to the
+ * slave. The log shipper handles both periodic shipping of log records as well
+ * as request based shipping. The request based shipping would be useful when
+ * the log buffer becomes full and needs to be freed before it can store
+ * subsequent log chunks.
+ *
+ */
+interface LogShipper {
+    /**
+     * updates the information about the latest instance of the log record
+     * that has been flushed to the disk.
+     *
+     * @param latestInstanceFlushedToDisk a long that contains the latest
+     *        instance of the log record that has been flushed to the disk.
+     */
+    public void flushedInstance(long latestInstanceFlushedToDisk);
+    
+    /**
+     * Ships the next log record chunk, if available, from the log buffer to
+     * the slave.
+     *
+     * @throws IOException If an exception occurs while trying to ship the
+     *                     replication message (containing the log records)
+     *                     across the network.
+     * @throws StandardException If an exception occurs while trying to read
+     *                           log records from the log buffer.
+     */
+    public void forceFlush() throws IOException, StandardException;
+}

Propchange: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/LogShipper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java?rev=581534&r1=581533&r2=581534&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/services/replication/master/MasterController.java Wed Oct  3 02:06:38 2007
@@ -271,5 +271,14 @@
     public void flushedTo(long instant) {
         // logShipper.flushedTo(instant); 
     }
-
+    
+    /**
+     * Used by the log shipper to inform the master controller about the 
+     * exception condition that caused it to terminate unexpectedly.
+     *
+     * @param exception the exception which caused the log shipper to terminate
+     *                  in an unexcepted manner.
+     */
+    void handleExceptions(Exception exception) {
+    }
 }