You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/11 21:10:05 UTC
svn commit: r713149 - in /activemq/sandbox/kahadb/src:
main/java/org/apache/kahadb/replication/ main/java/org/apache/kahadb/store/
main/proto/ test/java/org/apache/kahadb/replication/
Author: chirino
Date: Tue Nov 11 12:10:04 2008
New Revision: 713149
URL: http://svn.apache.org/viewvc?rev=713149&view=rev
Log:
Added better slave synchronization handling on the master side. We now create a snapshot for each slave session and clean up the snapshot once the slave is online.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Tue Nov 11 12:10:04 2008
@@ -22,6 +22,9 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,16 +55,19 @@
import com.google.protobuf.ByteString;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
private static final Log LOG = LogFactory.getLog(ReplicationServer.class);
private final ReplicationServer replicationServer;
- private Object serverMutex = new Object() {
- };
+ private Object serverMutex = new Object() {};
private TransportServer server;
private CopyOnWriteArrayList<ReplicationSession> sessions = new CopyOnWriteArrayList<ReplicationSession>();
+
+ AtomicInteger nextSnapshotId = new AtomicInteger();
public ReplicationMaster(ReplicationServer replication1Server) {
this.replicationServer = replication1Server;
@@ -136,6 +142,10 @@
private final Transport transport;
private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
+
+ private File snapshotFile;
+ private HashSet<Integer> journalReplicatedFiles;
+ private boolean online;
public ReplicationSession(Transport transport) {
this.transport = transport;
@@ -147,6 +157,7 @@
}
public void stop() throws Exception {
+ deleteReplicationData();
transport.stop();
}
@@ -155,9 +166,11 @@
ReplicationFrame frame = (ReplicationFrame) command;
switch (frame.getHeader().getType()) {
case SLAVE_INIT:
- subscribedToJournalUpdates.set(true);
onSlaveInit(frame, (PBSlaveInit) frame.getPayload());
break;
+ case SLAVE_ONLINE:
+ onSlaveOnline(frame);
+ break;
case FILE_TRANSFER:
onFileTransfer(frame, (PBFileInfo) frame.getPayload());
break;
@@ -187,8 +200,27 @@
public void transportResumed() {
}
+ private void onSlaveOnline(ReplicationFrame frame) {
+ online = true;
+ deleteReplicationData();
+ }
+
+ private void deleteReplicationData() {
+ if( snapshotFile!=null ) {
+ snapshotFile.delete();
+ snapshotFile=null;
+ }
+ if( journalReplicatedFiles!=null ) {
+ journalReplicatedFiles=null;
+ updateJournalReplicatedFiles();
+ }
+ }
+
private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception {
+ // Start sending journal updates to the slave.
+ subscribedToJournalUpdates.set(true);
+
// We could look at the slave state sent in the slaveInit and decide
// that a full sync is not needed..
// but for now we will do a full sync every time.
@@ -197,37 +229,78 @@
rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
rc.setPayload(rcPayload);
+ // Setup a map of all the files that the slave has
+ final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
+ for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
+ slaveFiles.put(info.getName(), info);
+ }
+
+
final KahaDBStore store = replicationServer.getStore();
store.checkpoint(new Callback() {
public void execute() throws Exception {
// This call back is executed once the checkpoint is
- // completed and all data has been
- // synced to disk, but while a lock is still held on the
- // store so that no
- // updates are allowed.
- ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>();
+ // completed and all data has been synced to disk,
+ // but while a lock is still held on the store so
+ // that no updates are done while we are in this
+ // method.
+
+ KahaDBStore store = replicationServer.getStore();
+ int snapshotId = nextSnapshotId.incrementAndGet();
+ File file = store.getPageFile().getFile();
+ snapshotFile = new File(file.getParentFile(), "snapshot-" + snapshotId);
+
+ journalReplicatedFiles = new HashSet<Integer>();
+
+ // Store the list files associated with the snapshot.
+ ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
for (DataFile df : journalFiles.values()) {
- infos.add(replicationServer.createInfo("journal-" + df.getDataFileId(), df.getFile(), df.getLength()));
+ // Look at what the slave has so that only the missing bits are transfered.
+ String name = "journal-" + df.getDataFileId();
+ PBFileInfo slaveInfo = slaveFiles.get(name);
+
+ // Use the checksum info to see if the slave has the file already.. Checksums are less acurrate for
+ // small amounts of data.. so ignore small files.
+ if( slaveInfo!=null && slaveInfo.getEnd()> 1024*512 ) {
+ // If the slave's file checksum matches what we have..
+ if( replicationServer.checksum(df.getFile(), 0, slaveInfo.getEnd())==slaveInfo.getChecksum() ) {
+ // is Our file longer? then we need to continue transferring the rest of the file.
+ if( df.getLength() > slaveInfo.getEnd() ) {
+ snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
+ journalReplicatedFiles.add(df.getDataFileId());
+ continue;
+ } else {
+ // No need to replicate this file.
+ continue;
+ }
+ }
+ }
+
+ // If we got here then it means we need to transfer the whole file.
+ snapshotInfos.add(replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
+ journalReplicatedFiles.add(df.getDataFileId());
}
+
+ PBFileInfo info = new PBFileInfo();
+ info.setName("database");
+ info.setSnapshotId(snapshotId);
+ info.setStart(0);
+ info.setEnd(file.length());
+ info.setChecksum(copyAndChecksum(file, snapshotFile));
+ snapshotInfos.add(info);
- SnapshotStatus snapshot = createSnapshot();
- PBFileInfo databaseInfo = new PBFileInfo();
- databaseInfo.setName("database");
- databaseInfo.setSnapshotId(snapshot.id);
- databaseInfo.setStart(0);
- databaseInfo.setEnd(snapshot.size);
- databaseInfo.setChecksum(snapshot.checksum);
- infos.add(databaseInfo);
+ rcPayload.setCopyFilesList(snapshotInfos);
- rcPayload.setCopyFilesList(infos);
+ updateJournalReplicatedFiles();
}
+
});
transport.oneway(rc);
}
-
+
private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
File file = replicationServer.getReplicationFile(fileInfo.getName());
long payloadSize = fileInfo.getEnd()-fileInfo.getStart();
@@ -254,38 +327,20 @@
}
- static class SlaveStatus {
- String salve_id;
- PBJournalLocation lastAck;
- Integer syncingSnapshot;
- }
-
- static class SnapshotStatus {
- int id;
- File file;
- long checksum;
- PBJournalLocation lastJournalLocation;
- long size;
- }
-
-
-
- int nextSnapshotId;
- SnapshotStatus currentSnapshot;
- private SnapshotStatus createSnapshot() throws IOException {
- if (currentSnapshot == null) {
- currentSnapshot = new SnapshotStatus();
- currentSnapshot.id = nextSnapshotId++;
- KahaDBStore store = replicationServer.getStore();
- File file = store.getPageFile().getFile();
- currentSnapshot.file = new File(file.getParentFile(), "snapshot-" + currentSnapshot.id);
- currentSnapshot.checksum = copyAndChecksum(file, currentSnapshot.file);
- currentSnapshot.lastJournalLocation = convert(store.getJournal().getLastAppendLocation());
- currentSnapshot.size = currentSnapshot.file.length();
+ /**
+ * Looks at all the journal files being currently replicated and informs the KahaDB so that
+ * it does not delete them while the replication is occuring.
+ */
+ private void updateJournalReplicatedFiles() {
+ HashSet<Integer> files = replicationServer.getStore().getJournalFilesBeingReplicated();
+ files.clear();
+ for (ReplicationSession session : sessions) {
+ if( session.journalReplicatedFiles !=null ) {
+ files.addAll(session.journalReplicatedFiles);
+ }
}
- return currentSnapshot;
}
-
+
private PBJournalLocation convert(Location loc) {
if( loc==null ) {
return null;
@@ -313,11 +368,11 @@
} finally {
try {
is.close();
- } finally {
+ } catch(Throwable e) {
}
try {
os.close();
- } finally {
+ } catch(Throwable e) {
}
}
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java Tue Nov 11 12:10:04 2008
@@ -17,8 +17,8 @@
package org.apache.kahadb.replication;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
@@ -211,24 +211,36 @@
}
}
- PBFileInfo createInfo(String name, File file, long length) throws IOException {
+ PBFileInfo createInfo(String name, File file, long start, long length) throws IOException {
PBFileInfo rc = new PBFileInfo();
rc.setName(name);
- FileInputStream is = new FileInputStream(file);
- byte buffer[] = new byte[1024 * 4];
- int c;
-
- long size = 0;
- Checksum checksum = new Adler32();
- while (size < length && (c = is.read(buffer, 0, (int) Math.min(length - size, buffer.length))) >= 0) {
- checksum.update(buffer, 0, c);
- size += c;
- }
- rc.setChecksum(checksum.getValue());
- rc.setStart(0);
- rc.setEnd(size);
+ rc.setChecksum(checksum(file, start, length));
+ rc.setStart(start);
+ rc.setEnd(length);
return rc;
}
+
+ long checksum(File file, long start, long end) throws IOException {
+ RandomAccessFile raf = new RandomAccessFile(file, "r");
+ try {
+ Checksum checksum = new Adler32();
+ byte buffer[] = new byte[1024 * 4];
+ int c;
+ long pos = start;
+ raf.seek(start);
+
+ while (pos < end && (c = raf.read(buffer, 0, (int) Math.min(end - pos, buffer.length))) >= 0) {
+ checksum.update(buffer, 0, c);
+ pos += c;
+ }
+
+ return checksum.getValue();
+ } finally {
+ try { raf.close(); } catch (Throwable e){}
+ }
+ }
+
+
public boolean isMaster() {
return master!=null;
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Tue Nov 11 12:10:04 2008
@@ -98,11 +98,11 @@
continue;
}
- PBFileInfo info = replicationServer.createInfo("database", file, file.length());
+ PBFileInfo info = replicationServer.createInfo("database", file, 0, file.length());
info.setSnapshotId(snapshot);
infosMap.put("database", info);
} else if( name.startsWith("journal-") ) {
- PBFileInfo info = replicationServer.createInfo(name, file, file.length());
+ PBFileInfo info = replicationServer.createInfo(name, file, 0, file.length());
infosMap.put(name, info);
}
}
@@ -117,12 +117,12 @@
if( infosMap.containsKey(name) ) {
continue;
}
- infosMap.put(name, replicationServer.createInfo(name, df.getFile(), df.getLength()));
+ infosMap.put(name, replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
}
if( !infosMap.containsKey("database") ) {
File pageFile = store.getPageFile().getFile();
if( pageFile.exists() ) {
- infosMap.put("database", replicationServer.createInfo("database", pageFile, pageFile.length()));
+ infosMap.put("database", replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
}
}
@@ -208,13 +208,15 @@
// Once the data has been synced.. we are going to
// go into an online recovery mode...
file = replicationServer.getReplicationFile(name);
- onlineRecovery=true;
}
journalUpateFile = new RandomAccessFile(file, "rw");
journalUpdateFileId = location.getFileId();
- }
+ }
journalUpateFile.seek(location.getOffset());
journalUpateFile.write(data);
+ if( !bulkSynchronizing ) {
+ onlineRecovery=true;
+ }
}
if( onlineRecovery ) {
@@ -236,8 +238,12 @@
private void commitBulkTransfer() throws IOException {
synchronized (transferMutex) {
- journalUpateFile.close();
- journalUpateFile=null;
+ LOG.info("Slave synhcronization complete, going online...");
+
+ if( journalUpateFile!=null ) {
+ journalUpateFile.close();
+ journalUpateFile=null;
+ }
replicationServer.getStore().close();
// If we got a new snapshot of the database, then we need to
@@ -258,7 +264,15 @@
bulkSynchronizing=false;
replicationServer.getStore().open();
+
+ LOG.info("Slave is now online. We are now eligible to become the master.");
}
+
+ // Let the master know we are now online.
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
+ transport.oneway(frame);
+
replicationServer.getStore().incrementalRecover();
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Tue Nov 11 12:10:04 2008
@@ -417,19 +417,16 @@
public void checkpoint(Callback closure) throws Exception {
- try {
- synchronized (indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, false);
- }
- });
- pageFile.flush();
- closure.execute();
- }
- store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
- } catch (IOException e) {
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, false);
+ }
+ });
+ pageFile.flush();
+ closure.execute();
}
+ store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
}
// /////////////////////////////////////////////////////////////////
@@ -619,6 +616,7 @@
// /////////////////////////////////////////////////////////////////
protected final Object indexMutex = new Object();
+ private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
@@ -775,7 +773,7 @@
});
}
-
+ inUseFiles.addAll(journalFilesBeingReplicated);
Location l = metadata.lastUpdate;
if( metadata.firstInProgressTransactionLocation!=null ) {
l = metadata.firstInProgressTransactionLocation;
@@ -787,12 +785,17 @@
LOG.debug("Checkpoint done.");
}
+
+ public HashSet<Integer> getJournalFilesBeingReplicated() {
+ return journalFilesBeingReplicated;
+ }
// /////////////////////////////////////////////////////////////////
// StoredDestination related implementation methods.
// /////////////////////////////////////////////////////////////////
- private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+
+ private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
class StoredSubscription {
SubscriptionInfo subscriptionInfo;
Modified: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Tue Nov 11 12:10:04 2008
@@ -42,14 +42,19 @@
// @followed-by PBSlaveInitResponse
SLAVE_INIT_RESPONSE = 1;
+ // The Slave will send this this command to the master once he has completed
+ // all his bulk synchronizations and he is ready to take over as being a master.
+ //
+ // @followed-by null
+ SLAVE_ONLINE=2;
+
// Sent from the Master to the slave to replicate a Journal update.
//
// @followed-by PBJournalUpdate
JOURNAL_UPDATE=3;
- // An ack sent from the Slave to a master to let the master know up to where in the journal the slave has
- // synchronized to. This acknowledges receipt of all previous journal records. This should not be sent until
- // all bulk file copies are complete.
+ // An ack sent back to the master in response to to a received
+ // JOURNAL_UPDATE
//
// @followed-by PBJournalLocation
JOURNAL_UPDATE_ACK=4;
@@ -91,9 +96,15 @@
// The files that the slave should delete
repeated string delete_files=2;
}
-
message PBJournalUpdate {
+ // Journal location of the update.
required PBJournalLocation location=1;
+ // The data that will be written at that location.
required bytes data=2;
+ // Should the slave send back an ack for this update.
+ optional bool send_ack=3;
+ // If true, then the slave should do a disk sync before returning a
+ // JOURNAL_UPDATE_ACK
+ optional bool disk_sync=4;
}
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=713149&r1=713148&r2=713149&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Tue Nov 11 12:10:04 2008
@@ -21,7 +21,6 @@
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -77,7 +76,7 @@
cluster.setClusterState(clusterState);
try {
- sendMesagesTo(100, BROKER1_URI);
+ sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
} catch( JMSException e ) {
fail("b1 did not become a master.");
}
@@ -93,12 +92,12 @@
try {
- sendMesagesTo(100, BROKER1_URI);
+ sendMesagesTo(BROKER1_URI, 100, "Pass 2: ");
} catch( JMSException e ) {
fail("Failed to send more messages...");
}
- Thread.sleep(1000);
+ Thread.sleep(2000);
// Make broker 2 the master.
clusterState = new ClusterState();
@@ -133,14 +132,14 @@
}
}
- private void sendMesagesTo(int count, String brokerUri) throws JMSException {
+ private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
Connection con = cf.createConnection();
try {
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage("Hello: "+i));
+ producer.send(session.createTextMessage(msg+i));
}
} finally {
try { con.close(); } catch (Throwable e) {}