You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/21 19:03:05 UTC

svn commit: r719659 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/replication/ test/java/org/apache/kahadb/replication/ test/resources/broker1/ test/resources/broker2/

Author: chirino
Date: Fri Nov 21 10:03:05 2008
New Revision: 719659

URL: http://svn.apache.org/viewvc?rev=719659&view=rev
Log:
We now properly support forcing sync replication to slaves to ensure that all updates are always replicated to at least 1 other slave.

The asyncReplication="true" option can be set which disables it and allows a master to continue operating even if there are no slaves online.  When asyncReplication="true", then you allow windows of time where ALL data may not be replicated and therefore you could have some data loss.

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml
    activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Fri Nov 21 10:03:05 2008
@@ -23,8 +23,9 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -50,32 +51,37 @@
 import org.apache.kahadb.store.KahaDBStore;
 import org.apache.kahadb.util.ByteSequence;
 
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
 public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
 
 	private static final Log LOG = LogFactory.getLog(ReplicationService.class);
 
-	private final ReplicationService replicationServer;
+	private final ReplicationService replicationService;
 
 	private Object serverMutex = new Object() {};
 	private TransportServer server;
-	private CopyOnWriteArrayList<ReplicationSession> sessions = new CopyOnWriteArrayList<ReplicationSession>();
 	
-	AtomicInteger nextSnapshotId = new AtomicInteger();
+	private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
+	
+	private final AtomicInteger nextSnapshotId = new AtomicInteger();
+    private final Map<Location, CountDownLatch> requestMap = new LinkedHashMap<Location, CountDownLatch>();
 
-	public ReplicationMaster(ReplicationService replication1Server) {
-		this.replicationServer = replication1Server;
+	public ReplicationMaster(ReplicationService replicationService) {
+		this.replicationService = replicationService;
 	}
 
 	public void start() throws Exception {
 		synchronized (serverMutex) {
-			server = TransportFactory.bind(new URI(replicationServer.getUri()));
+			server = TransportFactory.bind(new URI(replicationService.getUri()));
 			server.setAcceptListener(new TransportAcceptListener() {
 				public void onAccept(Transport transport) {
 					try {
 						synchronized (serverMutex) {
 							ReplicationSession session = new ReplicationSession(transport);
 							session.start();
-							sessions.add(session);
+							addSession(session);
 						}
 					} catch (Exception e) {
 						LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
@@ -88,73 +94,204 @@
 			});
 			server.start();
 		}
-		replicationServer.getStore().getJournal().setReplicationTarget(this);
-	}
-
-	public void stop() throws Exception {
-		synchronized (serverMutex) {
-			if (server != null) {
-				server.stop();
-				server = null;
-			}
-		}
+		replicationService.getStore().getJournal().setReplicationTarget(this);
 	}
+	
+    boolean isStarted() {
+        synchronized (serverMutex) {
+            return server!=null;
+        }
+    }
+    
+    public void stop() throws Exception {
+        replicationService.getStore().getJournal().setReplicationTarget(null);
+        synchronized (serverMutex) {
+            if (server != null) {
+                server.stop();
+                server = null;
+            }
+        }
+        
+        ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            sessionsSnapshot = this.sessions;
+        }
+        
+        for (ReplicationSession session: sessionsSnapshot) {
+            session.stop();
+        }
+    }
+
+	protected void addSession(ReplicationSession session) {
+	    synchronized (sessions) {
+	        sessions = new ArrayList<ReplicationSession>(sessions);
+	        sessions.add(session);
+        }
+    }
+	
+    protected void removeSession(ReplicationSession session) {
+        synchronized (sessions) {
+            sessions = new ArrayList<ReplicationSession>(sessions);
+            sessions.remove(session);
+        }
+    }
 
 	public void onClusterChange(ClusterState config) {
 		// For now, we don't really care about changes in the slave config..
 	}
 
-
 	/**
 	 * This is called by the Journal so that we can replicate the update to the 
 	 * slaves.
 	 */
 	public void replicate(Location location, ByteSequence sequence, boolean sync) {
-		if( sessions.isEmpty() ) 
-			return;
-		ReplicationFrame frame = new ReplicationFrame();
-		frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
-		PBJournalUpdate payload = new PBJournalUpdate();
-		payload.setLocation(ReplicationSupport.convert(location));
-		payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
-		frame.setPayload(payload);
-
-		for (ReplicationSession session : sessions) {
+	    ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            // Hurrah for copy on write..
+            sessionsSnapshot = this.sessions;
+        }
+	    
+
+        // We may be configured to always do async replication..
+		if ( replicationService.isAsyncReplication() ) {
+		    sync=false;
+		}
+		CountDownLatch latch=null;
+		if( sync ) {
+    		latch = new CountDownLatch(1);
+            synchronized (requestMap) {
+                requestMap.put(location, latch);
+            }
+		}
+		
+		ReplicationFrame frame=null;
+		for (ReplicationSession session : sessionsSnapshot) {
 			if( session.subscribedToJournalUpdates.get() ) {
+			    
+			    // Lazy create the frame since we may have not avilable sessions to send to.
+			    if( frame == null ) {
+    		        frame = new ReplicationFrame();
+                    frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
+                    PBJournalUpdate payload = new PBJournalUpdate();
+                    payload.setLocation(ReplicationSupport.convert(location));
+                    payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
+                    payload.setSendAck(sync);
+                    frame.setPayload(payload);
+			    }
+
 				// TODO: use async send threads so that the frames can be pushed out in parallel. 
 				try {
+				    session.setLastUpdateLocation(location);
 					session.transport.oneway(frame);
 				} catch (IOException e) {
 					session.onException(e);
 				}
 			}
 		}
+		
+        if (sync) {
+            try {
+                int timeout = 500;
+                int counter=0;
+                while( true ) {
+                    if( latch.await(timeout, TimeUnit.MILLISECONDS) ) {
+                        synchronized (requestMap) {
+                            requestMap.remove(location);
+                        }
+                        return;
+                    }
+                    if( !isStarted() ) {
+                        return;
+                    }
+                    counter++;
+                    if( (counter%10)==0 ) {
+                        LOG.warn("KahaDB is waiting for slave to come online. "+(timeout*counter/1000.f)+" seconds have elapsed.");
+                    }
+                } 
+            } catch (InterruptedException ignore) {
+            }
+        }
+		
 	}
+	
+    private void ackAllFromTo(Location lastAck, Location newAck) {
+        if ( replicationService.isAsyncReplication() ) {
+            return;
+        }
+        
+        ArrayList<Entry<Location, CountDownLatch>> entries;
+        synchronized (requestMap) {
+            entries = new ArrayList<Entry<Location, CountDownLatch>>(requestMap.entrySet());
+        }
+        boolean inRange=false;
+        for (Entry<Location, CountDownLatch> entry : entries) {
+            Location l = entry.getKey();
+            if( !inRange ) {
+                if( lastAck==null || lastAck.compareTo(l) < 0 ) {
+                    inRange=true;
+                }
+            }
+            if( inRange ) {
+                entry.getValue().countDown();
+                if( newAck!=null && l.compareTo(newAck) <= 0 ) {
+                    return;
+                }
+            }
+        }
+    }
+
 
 	class ReplicationSession implements Service, TransportListener {
 
 		private final Transport transport;
 		private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
+        private boolean stopped;
 		
 		private File snapshotFile;
 		private HashSet<Integer> journalReplicatedFiles;
-		private boolean online;
+		private Location lastAckLocation;
+        private Location lastUpdateLocation;
+        private boolean online;
 
 		public ReplicationSession(Transport transport) {
 			this.transport = transport;
 		}
 
-		public void start() throws Exception {
+		synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
+            this.lastUpdateLocation = lastUpdateLocation;
+        }
+
+        public void start() throws Exception {
 			transport.setTransportListener(this);
 			transport.start();
 		}
 
-		public void stop() throws Exception {
-			deleteReplicationData();
-			transport.stop();
+        synchronized public void stop() throws Exception {
+		    if ( !stopped  ) { 
+		        stopped=true;
+    			deleteReplicationData();
+    			transport.stop();
+		    }
 		}
 
-		public void onCommand(Object command) {
+		synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation location) {
+            Location ack = ReplicationSupport.convert(location);
+		    if( online ) {
+                ackAllFromTo(lastAckLocation, ack);
+		    }
+            lastAckLocation=ack;
+	    }
+		
+		synchronized private void onSlaveOnline(ReplicationFrame frame) {
+            deleteReplicationData();
+            online  = true;
+            if( lastAckLocation!=null ) {
+                ackAllFromTo(null, lastAckLocation);
+            }
+            
+        }
+
+        public void onCommand(Object command) {
 			try {
 				ReplicationFrame frame = (ReplicationFrame) command;
 				switch (frame.getHeader().getType()) {
@@ -193,11 +330,6 @@
 		public void transportResumed() {
 		}
 		
-		private void onSlaveOnline(ReplicationFrame frame) {
-			online = true;
-			deleteReplicationData();
-		}
-
 		private void deleteReplicationData() {
 			if( snapshotFile!=null ) {
 				snapshotFile.delete();
@@ -229,7 +361,7 @@
 			}
 			
 			
-			final KahaDBStore store = replicationServer.getStore();
+			final KahaDBStore store = replicationService.getStore();
 			store.checkpoint(new Callback() {
 				public void execute() throws Exception {
 					// This call back is executed once the checkpoint is
@@ -238,11 +370,14 @@
 					// that no updates are done while we are in this
 					// method.
 					
-					KahaDBStore store = replicationServer.getStore();
-
+					KahaDBStore store = replicationService.getStore();
+					if( lastAckLocation==null ) {
+					    lastAckLocation = store.getLastUpdatePosition();
+					}
+					
 					int snapshotId = nextSnapshotId.incrementAndGet();
 					File file = store.getPageFile().getFile();
-					File dir = replicationServer.getTempReplicationDir();
+					File dir = replicationService.getTempReplicationDir();
 					dir.mkdirs();
 					snapshotFile = new File(dir, "snapshot-" + snapshotId);
 					
@@ -294,7 +429,6 @@
 					}
 					rcPayload.setDeleteFilesList(deleteFiles);
 					
-					
 					updateJournalReplicatedFiles();
 				}
 
@@ -304,7 +438,7 @@
 		}
 		
 		private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
-			File file = replicationServer.getReplicationFile(fileInfo.getName());
+			File file = replicationService.getReplicationFile(fileInfo.getName());
 			long payloadSize = fileInfo.getEnd()-fileInfo.getStart();
 			
 			if( file.length() < fileInfo.getStart()+payloadSize ) {
@@ -334,17 +468,20 @@
 	 * it does not delete them while the replication is occuring.
 	 */
 	private void updateJournalReplicatedFiles() {
-		HashSet<Integer>  files = replicationServer.getStore().getJournalFilesBeingReplicated();
+		HashSet<Integer>  files = replicationService.getStore().getJournalFilesBeingReplicated();
 		files.clear();
-		for (ReplicationSession session : sessions) {
+
+        ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            // Hurrah for copy on write..
+            sessionsSnapshot = this.sessions;
+        }
+        
+		for (ReplicationSession session : sessionsSnapshot) {
 			if( session.journalReplicatedFiles !=null ) {
 				files.addAll(session.journalReplicatedFiles);
 			}
 		}
 	}
 	
-
-	private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation journalLocation) {
-	}
-
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java Fri Nov 21 10:03:05 2008
@@ -47,6 +47,7 @@
     private File tempReplicationDir;
     private String uri;
     private ClusterStateManager cluster;
+    private boolean asyncReplication=false;
     
     private KahaDBStore store;
 
@@ -278,5 +279,14 @@
         this.cluster = cluster;
     }
 
+    public void setAsyncReplication(boolean asyncReplication) {
+        this.asyncReplication = asyncReplication;
+    }
+
+    public boolean isAsyncReplication() {
+        return asyncReplication;
+    }
+
+
 
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Fri Nov 21 10:03:05 2008
@@ -255,6 +255,20 @@
 	}
 	
 	private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate update) throws IOException {
+	    
+	    // Send an ack back once we get the ack.. yeah it's a little dirty to ack before it's on disk,
+	    // but chances are low that both machines are going to loose power at the same time and this way,
+	    // we reduce the latency the master sees from us.
+	    if( update.getSendAck() ) {
+	        ReplicationFrame ack = new ReplicationFrame();
+	        ack.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE_ACK));
+	        ack.setPayload(update.getLocation());
+	        transport.oneway(ack);
+	    }
+	    
+	    // TODO: actually do the disk write in an async thread so that this thread can be  
+	    // start reading in the next journal updated.
+	    
 		boolean onlineRecovery=false;
 		PBJournalLocation location = update.getLocation();
 		byte[] data = update.getData().toByteArray();

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Fri Nov 21 10:03:05 2008
@@ -49,6 +49,7 @@
 		StaticClusterStateManager cluster = new StaticClusterStateManager();
 		
 		ReplicationService rs1 = new ReplicationService();
+		rs1.setAsyncReplication(true);
 		rs1.setUri(BROKER1_REPLICATION_ID);
 		rs1.setCluster(cluster);
 		rs1.setDirectory(new File("target/replication-test/broker1"));
@@ -56,6 +57,7 @@
 		rs1.start();
 
         ReplicationService rs2 = new ReplicationService();
+        rs2.setAsyncReplication(true);
         rs2.setUri(BROKER2_REPLICATION_ID);
         rs2.setCluster(cluster);
         rs2.setDirectory(new File("target/replication-test/broker2"));

Modified: activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml (original)
+++ activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml Fri Nov 21 10:03:05 2008
@@ -32,7 +32,8 @@
 	  <kahadb-replication
     	directory="target/kaha-data/broker1" 
     	brokerURI="xbean:broker1/ha-broker.xml" 
-    	uri="kdbr://localhost:6001">
+    	uri="kdbr://localhost:6001"
+    	asyncReplication="true">
     	
     	<cluster>
     		<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>

Modified: activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml (original)
+++ activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml Fri Nov 21 10:03:05 2008
@@ -32,7 +32,8 @@
 	  <kahadb-replication
     	directory="target/kaha-data-broker2" 
     	brokerURI="xbean:broker2/ha-broker.xml" 
-    	uri="kdbr://localhost:6002">
+    	uri="kdbr://localhost:6002"
+    	asyncReplication="true">
     	
     	<cluster>
     		<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>