You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/11 21:10:05 UTC

svn commit: r713149 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/replication/ main/java/org/apache/kahadb/store/ main/proto/ test/java/org/apache/kahadb/replication/

Author: chirino
Date: Tue Nov 11 12:10:04 2008
New Revision: 713149

URL: http://svn.apache.org/viewvc?rev=713149&view=rev
Log:
Added better slave synchronization handling on the master side.  We now create a snapshot for each slave session and clean up the snapshot once the slave is online.

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Tue Nov 11 12:10:04 2008
@@ -22,6 +22,9 @@
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,16 +55,19 @@
 
 import com.google.protobuf.ByteString;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
 public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
 
 	private static final Log LOG = LogFactory.getLog(ReplicationServer.class);
 
 	private final ReplicationServer replicationServer;
 
-	private Object serverMutex = new Object() {
-	};
+	private Object serverMutex = new Object() {};
 	private TransportServer server;
 	private CopyOnWriteArrayList<ReplicationSession> sessions = new CopyOnWriteArrayList<ReplicationSession>();
+	
+	AtomicInteger nextSnapshotId = new AtomicInteger();
 
 	public ReplicationMaster(ReplicationServer replication1Server) {
 		this.replicationServer = replication1Server;
@@ -136,6 +142,10 @@
 
 		private final Transport transport;
 		private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
+		
+		private File snapshotFile;
+		private HashSet<Integer> journalReplicatedFiles;
+		private boolean online;
 
 		public ReplicationSession(Transport transport) {
 			this.transport = transport;
@@ -147,6 +157,7 @@
 		}
 
 		public void stop() throws Exception {
+			deleteReplicationData();
 			transport.stop();
 		}
 
@@ -155,9 +166,11 @@
 				ReplicationFrame frame = (ReplicationFrame) command;
 				switch (frame.getHeader().getType()) {
 				case SLAVE_INIT:
-					subscribedToJournalUpdates.set(true);
 					onSlaveInit(frame, (PBSlaveInit) frame.getPayload());
 					break;
+				case SLAVE_ONLINE:
+					onSlaveOnline(frame);
+					break;
 				case FILE_TRANSFER:
 					onFileTransfer(frame, (PBFileInfo) frame.getPayload());
 					break;
@@ -187,8 +200,27 @@
 		public void transportResumed() {
 		}
 		
+		private void onSlaveOnline(ReplicationFrame frame) {
+			online = true;
+			deleteReplicationData();
+		}
+
+		private void deleteReplicationData() {
+			if( snapshotFile!=null ) {
+				snapshotFile.delete();
+				snapshotFile=null;
+			}
+			if( journalReplicatedFiles!=null ) {
+				journalReplicatedFiles=null;
+				updateJournalReplicatedFiles();
+			}
+		}
+
 		private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception {
 
+			// Start sending journal updates to the slave.
+			subscribedToJournalUpdates.set(true);
+
 			// We could look at the slave state sent in the slaveInit and decide
 			// that a full sync is not needed..
 			// but for now we will do a full sync every time.
@@ -197,37 +229,78 @@
 			rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
 			rc.setPayload(rcPayload);
 			
+			// Setup a map of all the files that the slave has
+			final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
+			for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
+				slaveFiles.put(info.getName(), info);
+			}
+			
+			
 			final KahaDBStore store = replicationServer.getStore();
 			store.checkpoint(new Callback() {
 				public void execute() throws Exception {
 					// This call back is executed once the checkpoint is
-					// completed and all data has been
-					// synced to disk, but while a lock is still held on the
-					// store so that no
-					// updates are allowed.
-					ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>();
+					// completed and all data has been synced to disk, 
+					// but while a lock is still held on the store so 
+					// that no updates are done while we are in this
+					// method.
+					
+					KahaDBStore store = replicationServer.getStore();
 
+					int snapshotId = nextSnapshotId.incrementAndGet();
+					File file = store.getPageFile().getFile();
+					snapshotFile = new File(file.getParentFile(), "snapshot-" + snapshotId);
+					
+					journalReplicatedFiles = new HashSet<Integer>();
+					
+					// Store the list files associated with the snapshot.
+					ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
 					Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
 					for (DataFile df : journalFiles.values()) {
-						infos.add(replicationServer.createInfo("journal-" + df.getDataFileId(), df.getFile(), df.getLength()));
+						// Look at what the slave has so that only the missing bits are transfered.
+						String name = "journal-" + df.getDataFileId();
+						PBFileInfo slaveInfo = slaveFiles.get(name);
+						
+						// Use the checksum info to see if the slave has the file already.. Checksums are less acurrate for
+						// small amounts of data.. so ignore small files.
+						if( slaveInfo!=null && slaveInfo.getEnd()> 1024*512 ) {
+							// If the slave's file checksum matches what we have..
+							if( replicationServer.checksum(df.getFile(), 0, slaveInfo.getEnd())==slaveInfo.getChecksum() ) {
+								// is Our file longer? then we need to continue transferring the rest of the file.
+								if( df.getLength() > slaveInfo.getEnd() ) {
+									snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
+									journalReplicatedFiles.add(df.getDataFileId());
+									continue;
+								} else {
+									// No need to replicate this file.
+									continue;
+								}
+							} 
+						}
+						
+						// If we got here then it means we need to transfer the whole file.
+						snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
+						journalReplicatedFiles.add(df.getDataFileId());
 					}
+
+					PBFileInfo info = new PBFileInfo();
+					info.setName("database");
+					info.setSnapshotId(snapshotId);
+					info.setStart(0);
+					info.setEnd(file.length());
+					info.setChecksum(copyAndChecksum(file, snapshotFile));
+					snapshotInfos.add(info);
 					
-					SnapshotStatus snapshot = createSnapshot();
-					PBFileInfo databaseInfo = new PBFileInfo();
-					databaseInfo.setName("database");
-					databaseInfo.setSnapshotId(snapshot.id);
-					databaseInfo.setStart(0);
-					databaseInfo.setEnd(snapshot.size);
-					databaseInfo.setChecksum(snapshot.checksum);
-					infos.add(databaseInfo);
+					rcPayload.setCopyFilesList(snapshotInfos);
 					
-					rcPayload.setCopyFilesList(infos);
+					updateJournalReplicatedFiles();
 				}
+
 			});
 			
 			transport.oneway(rc);
 		}
-
+		
 		private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
 			File file = replicationServer.getReplicationFile(fileInfo.getName());
 			long payloadSize = fileInfo.getEnd()-fileInfo.getStart();
@@ -254,38 +327,20 @@
 
 	}
 
-	static class SlaveStatus {
-		String salve_id;
-		PBJournalLocation lastAck;
-		Integer syncingSnapshot;
-	}
-
-	static class SnapshotStatus {
-		int id;
-		File file;
-		long checksum;
-		PBJournalLocation lastJournalLocation;
-		long size;
-	}
-
-
-
-	int nextSnapshotId;
-	SnapshotStatus currentSnapshot;
-	private SnapshotStatus createSnapshot() throws IOException {
-		if (currentSnapshot == null) {
-			currentSnapshot = new SnapshotStatus();
-			currentSnapshot.id = nextSnapshotId++;
-			KahaDBStore store = replicationServer.getStore();
-			File file = store.getPageFile().getFile();
-			currentSnapshot.file = new File(file.getParentFile(), "snapshot-" + currentSnapshot.id);
-			currentSnapshot.checksum = copyAndChecksum(file, currentSnapshot.file);
-			currentSnapshot.lastJournalLocation = convert(store.getJournal().getLastAppendLocation());
-			currentSnapshot.size = currentSnapshot.file.length();
+	/**
+	 * Looks at all the journal files being currently replicated and informs the KahaDB so that
+	 * it does not delete them while the replication is occuring.
+	 */
+	private void updateJournalReplicatedFiles() {
+		HashSet<Integer>  files = replicationServer.getStore().getJournalFilesBeingReplicated();
+		files.clear();
+		for (ReplicationSession session : sessions) {
+			if( session.journalReplicatedFiles !=null ) {
+				files.addAll(session.journalReplicatedFiles);
+			}
 		}
-		return currentSnapshot;
 	}
-
+	
 	private PBJournalLocation convert(Location loc) {
 		if( loc==null ) {
 			return null;
@@ -313,11 +368,11 @@
 		} finally {
 			try {
 				is.close();
-			} finally {
+			} catch(Throwable e) {
 			}
 			try {
 				os.close();
-			} finally {
+			} catch(Throwable e) {
 			}
 		}
 	}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java Tue Nov 11 12:10:04 2008
@@ -17,8 +17,8 @@
 package org.apache.kahadb.replication;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
@@ -211,24 +211,36 @@
 		}
 	}
 	
-	PBFileInfo createInfo(String name, File file, long length) throws IOException {
+	PBFileInfo createInfo(String name, File file, long start, long length) throws IOException {
 		PBFileInfo rc = new PBFileInfo();
 		rc.setName(name);
-		FileInputStream is = new FileInputStream(file);
-		byte buffer[] = new byte[1024 * 4];
-		int c;
-
-		long size = 0;
-		Checksum checksum = new Adler32();
-		while (size < length && (c = is.read(buffer, 0, (int) Math.min(length - size, buffer.length))) >= 0) {
-			checksum.update(buffer, 0, c);
-			size += c;
-		}
-		rc.setChecksum(checksum.getValue());
-		rc.setStart(0);
-		rc.setEnd(size);
+		rc.setChecksum(checksum(file, start, length));
+		rc.setStart(start);
+		rc.setEnd(length);
 		return rc;
 	}
+	
+	long checksum(File file, long start, long end) throws IOException {
+		RandomAccessFile raf = new RandomAccessFile(file, "r");
+		try {
+			Checksum checksum = new Adler32();
+			byte buffer[] = new byte[1024 * 4];
+			int c;
+			long pos = start;
+			raf.seek(start);
+			
+			while (pos < end && (c = raf.read(buffer, 0, (int) Math.min(end - pos, buffer.length))) >= 0) {
+				checksum.update(buffer, 0, c);
+				pos += c;
+			}
+			
+			return checksum.getValue();
+		} finally {
+			try { raf.close(); } catch (Throwable e){}
+		}
+	}
+
+	
 	public boolean isMaster() {
 		return master!=null;
 	}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Tue Nov 11 12:10:04 2008
@@ -98,11 +98,11 @@
 						continue;
 					}
 					
-					PBFileInfo info = replicationServer.createInfo("database", file, file.length());
+					PBFileInfo info = replicationServer.createInfo("database", file, 0, file.length());
 					info.setSnapshotId(snapshot);
 					infosMap.put("database", info);
 				} else if( name.startsWith("journal-") ) {
-					PBFileInfo info = replicationServer.createInfo(name, file, file.length());
+					PBFileInfo info = replicationServer.createInfo(name, file, 0, file.length());
 					infosMap.put(name, info);
 				}
 			}
@@ -117,12 +117,12 @@
 			if( infosMap.containsKey(name) ) {
 				continue;
 			}
-			infosMap.put(name, replicationServer.createInfo(name, df.getFile(), df.getLength()));
+			infosMap.put(name, replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
 		}
 		if( !infosMap.containsKey("database") ) {
 			File pageFile = store.getPageFile().getFile();
 			if( pageFile.exists() ) {
-				infosMap.put("database", replicationServer.createInfo("database", pageFile, pageFile.length()));
+				infosMap.put("database", replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
 			}
 		}
 		
@@ -208,13 +208,15 @@
 					// Once the data has been synced.. we are going to 
 					// go into an online recovery mode...
 					file = replicationServer.getReplicationFile(name);
-					onlineRecovery=true;
 				}
 				journalUpateFile = new RandomAccessFile(file, "rw");
 				journalUpdateFileId = location.getFileId();
-			}
+			}			
 			journalUpateFile.seek(location.getOffset());
 			journalUpateFile.write(data);
+			if( !bulkSynchronizing ) {
+				onlineRecovery=true;
+			}
 		}
 		
 		if( onlineRecovery ) {
@@ -236,8 +238,12 @@
 	private void commitBulkTransfer() throws IOException {
 		synchronized (transferMutex) {
 			
-			journalUpateFile.close();
-			journalUpateFile=null;
+			LOG.info("Slave synhcronization complete, going online...");
+
+			if( journalUpateFile!=null ) {
+				journalUpateFile.close();
+				journalUpateFile=null;
+			}
 			replicationServer.getStore().close();
 			
 			// If we got a new snapshot of the database, then we need to 
@@ -258,7 +264,15 @@
 			bulkSynchronizing=false;
 			
 			replicationServer.getStore().open();
+			
+			LOG.info("Slave is now online.  We are now eligible to become the master.");
 		}
+		
+		// Let the master know we are now online.
+		ReplicationFrame frame = new ReplicationFrame();
+		frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
+		transport.oneway(frame);
+		
 		replicationServer.getStore().incrementalRecover();
 	}
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Tue Nov 11 12:10:04 2008
@@ -417,19 +417,16 @@
 
     
 	public void checkpoint(Callback closure) throws Exception {
-        try {
-            synchronized (indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, false);
-                    }
-                });
-                pageFile.flush();
-                closure.execute();
-            }
-            store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
-        } catch (IOException e) {
+        synchronized (indexMutex) {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    checkpointUpdate(tx, false);
+                }
+            });
+            pageFile.flush();
+            closure.execute();
         }
+        store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
 	}
 
     // /////////////////////////////////////////////////////////////////
@@ -619,6 +616,7 @@
     // /////////////////////////////////////////////////////////////////
 
     protected final Object indexMutex = new Object();
+	private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
 
     private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
@@ -775,7 +773,7 @@
     
                 });
             }
-            
+            inUseFiles.addAll(journalFilesBeingReplicated);
             Location l = metadata.lastUpdate;
             if( metadata.firstInProgressTransactionLocation!=null ) {
                 l = metadata.firstInProgressTransactionLocation;
@@ -787,12 +785,17 @@
         
         LOG.debug("Checkpoint done.");
     }
+    
+    public HashSet<Integer> getJournalFilesBeingReplicated() {
+		return journalFilesBeingReplicated;
+	}
 
     // /////////////////////////////////////////////////////////////////
     // StoredDestination related implementation methods.
     // /////////////////////////////////////////////////////////////////
 
-    private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+
+	private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
 
     class StoredSubscription {
         SubscriptionInfo subscriptionInfo;

Modified: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Tue Nov 11 12:10:04 2008
@@ -42,14 +42,19 @@
 	// @followed-by PBSlaveInitResponse	
 	SLAVE_INIT_RESPONSE = 1;
   
+  	// The Slave will send this this command to the master once he has completed 
+  	// all his bulk synchronizations and he is ready to take over as being a master. 
+	//    
+	// @followed-by null	
+  	SLAVE_ONLINE=2;
+  	
 	// Sent from the Master to the slave to replicate a Journal update.
 	//    
 	// @followed-by PBJournalUpdate	
 	JOURNAL_UPDATE=3;	
 	
-	// An ack sent from the Slave to a master to let the master know up to where in the journal the slave has
-	// synchronized to.  This acknowledges receipt of all previous journal records.  This should not be sent until
-	// all bulk file copies are complete.
+	// An ack sent back to the master in response to to a received 
+	// JOURNAL_UPDATE
 	//    
 	// @followed-by PBJournalLocation	
 	JOURNAL_UPDATE_ACK=4;
@@ -91,9 +96,15 @@
 	// The files that the slave should delete
 	repeated string delete_files=2;
 }
-
 message PBJournalUpdate {
+    // Journal location of the update.
     required PBJournalLocation location=1;
+    // The data that will be written at that location.
     required bytes data=2;
+    // Should the slave send back an ack for this update.
+    optional bool send_ack=3;
+    // If true, then the slave should do a disk sync before returning a
+    // JOURNAL_UPDATE_ACK
+    optional bool disk_sync=4;
 }
 

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Tue Nov 11 12:10:04 2008
@@ -21,7 +21,6 @@
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -77,7 +76,7 @@
 		cluster.setClusterState(clusterState);
 		
 		try {
-			sendMesagesTo(100, BROKER1_URI);
+			sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
 		} catch( JMSException e ) {
 			fail("b1 did not become a master.");
 		}
@@ -93,12 +92,12 @@
 		
 		
 		try {
-			sendMesagesTo(100, BROKER1_URI);
+			sendMesagesTo(BROKER1_URI, 100, "Pass 2: ");
 		} catch( JMSException e ) {
 			fail("Failed to send more messages...");
 		}
 		
-		Thread.sleep(1000);
+		Thread.sleep(2000);
 		
 		// Make broker 2 the master.
 		clusterState = new ClusterState();
@@ -133,14 +132,14 @@
 		}
 	}
 
-	private void sendMesagesTo(int count, String brokerUri) throws JMSException {
+	private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
 		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
 		Connection con = cf.createConnection();
 		try {
 			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
 			MessageProducer producer = session.createProducer(destination);
 			for (int i = 0; i < count; i++) {
-				producer.send(session.createTextMessage("Hello: "+i));
+				producer.send(session.createTextMessage(msg+i));
 			}
 		} finally {
 			try { con.close(); } catch (Throwable e) {}