You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/21 19:03:05 UTC
svn commit: r719659 - in /activemq/sandbox/kahadb/src:
main/java/org/apache/kahadb/replication/
test/java/org/apache/kahadb/replication/ test/resources/broker1/
test/resources/broker2/
Author: chirino
Date: Fri Nov 21 10:03:05 2008
New Revision: 719659
URL: http://svn.apache.org/viewvc?rev=719659&view=rev
Log:
We now properly support forcing sync replication to slaves to ensure that all updates are always replicated to at least 1 other slave.
The asyncReplication="true" option can be set which disables it and allows a master to continue operating even if there are no slaves online. When asyncReplication="true", then you allow windows of time where ALL data may not be replicated and therefore you could have some data loss.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml
activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Fri Nov 21 10:03:05 2008
@@ -23,8 +23,9 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,32 +51,37 @@
import org.apache.kahadb.store.KahaDBStore;
import org.apache.kahadb.util.ByteSequence;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
private static final Log LOG = LogFactory.getLog(ReplicationService.class);
- private final ReplicationService replicationServer;
+ private final ReplicationService replicationService;
private Object serverMutex = new Object() {};
private TransportServer server;
- private CopyOnWriteArrayList<ReplicationSession> sessions = new CopyOnWriteArrayList<ReplicationSession>();
- AtomicInteger nextSnapshotId = new AtomicInteger();
+ private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
+
+ private final AtomicInteger nextSnapshotId = new AtomicInteger();
+ private final Map<Location, CountDownLatch> requestMap = new LinkedHashMap<Location, CountDownLatch>();
- public ReplicationMaster(ReplicationService replication1Server) {
- this.replicationServer = replication1Server;
+ public ReplicationMaster(ReplicationService replicationService) {
+ this.replicationService = replicationService;
}
public void start() throws Exception {
synchronized (serverMutex) {
- server = TransportFactory.bind(new URI(replicationServer.getUri()));
+ server = TransportFactory.bind(new URI(replicationService.getUri()));
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) {
try {
synchronized (serverMutex) {
ReplicationSession session = new ReplicationSession(transport);
session.start();
- sessions.add(session);
+ addSession(session);
}
} catch (Exception e) {
LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
@@ -88,73 +94,204 @@
});
server.start();
}
- replicationServer.getStore().getJournal().setReplicationTarget(this);
- }
-
- public void stop() throws Exception {
- synchronized (serverMutex) {
- if (server != null) {
- server.stop();
- server = null;
- }
- }
+ replicationService.getStore().getJournal().setReplicationTarget(this);
}
+
+ boolean isStarted() {
+ synchronized (serverMutex) {
+ return server!=null;
+ }
+ }
+
+ public void stop() throws Exception {
+ replicationService.getStore().getJournal().setReplicationTarget(null);
+ synchronized (serverMutex) {
+ if (server != null) {
+ server.stop();
+ server = null;
+ }
+ }
+
+ ArrayList<ReplicationSession> sessionsSnapshot;
+ synchronized (this.sessions) {
+ sessionsSnapshot = this.sessions;
+ }
+
+ for (ReplicationSession session: sessionsSnapshot) {
+ session.stop();
+ }
+ }
+
+ protected void addSession(ReplicationSession session) {
+ synchronized (sessions) {
+ sessions = new ArrayList<ReplicationSession>(sessions);
+ sessions.add(session);
+ }
+ }
+
+ protected void removeSession(ReplicationSession session) {
+ synchronized (sessions) {
+ sessions = new ArrayList<ReplicationSession>(sessions);
+ sessions.remove(session);
+ }
+ }
public void onClusterChange(ClusterState config) {
// For now, we don't really care about changes in the slave config..
}
-
/**
* This is called by the Journal so that we can replicate the update to the
* slaves.
*/
public void replicate(Location location, ByteSequence sequence, boolean sync) {
- if( sessions.isEmpty() )
- return;
- ReplicationFrame frame = new ReplicationFrame();
- frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
- PBJournalUpdate payload = new PBJournalUpdate();
- payload.setLocation(ReplicationSupport.convert(location));
- payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
- frame.setPayload(payload);
-
- for (ReplicationSession session : sessions) {
+ ArrayList<ReplicationSession> sessionsSnapshot;
+ synchronized (this.sessions) {
+ // Hurrah for copy on write..
+ sessionsSnapshot = this.sessions;
+ }
+
+
+ // We may be configured to always do async replication..
+ if ( replicationService.isAsyncReplication() ) {
+ sync=false;
+ }
+ CountDownLatch latch=null;
+ if( sync ) {
+ latch = new CountDownLatch(1);
+ synchronized (requestMap) {
+ requestMap.put(location, latch);
+ }
+ }
+
+ ReplicationFrame frame=null;
+ for (ReplicationSession session : sessionsSnapshot) {
if( session.subscribedToJournalUpdates.get() ) {
+
+ // Lazy create the frame since we may have not avilable sessions to send to.
+ if( frame == null ) {
+ frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
+ PBJournalUpdate payload = new PBJournalUpdate();
+ payload.setLocation(ReplicationSupport.convert(location));
+ payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
+ payload.setSendAck(sync);
+ frame.setPayload(payload);
+ }
+
// TODO: use async send threads so that the frames can be pushed out in parallel.
try {
+ session.setLastUpdateLocation(location);
session.transport.oneway(frame);
} catch (IOException e) {
session.onException(e);
}
}
}
+
+ if (sync) {
+ try {
+ int timeout = 500;
+ int counter=0;
+ while( true ) {
+ if( latch.await(timeout, TimeUnit.MILLISECONDS) ) {
+ synchronized (requestMap) {
+ requestMap.remove(location);
+ }
+ return;
+ }
+ if( !isStarted() ) {
+ return;
+ }
+ counter++;
+ if( (counter%10)==0 ) {
+ LOG.warn("KahaDB is waiting for slave to come online. "+(timeout*counter/1000.f)+" seconds have elapsed.");
+ }
+ }
+ } catch (InterruptedException ignore) {
+ }
+ }
+
}
+
+ private void ackAllFromTo(Location lastAck, Location newAck) {
+ if ( replicationService.isAsyncReplication() ) {
+ return;
+ }
+
+ ArrayList<Entry<Location, CountDownLatch>> entries;
+ synchronized (requestMap) {
+ entries = new ArrayList<Entry<Location, CountDownLatch>>(requestMap.entrySet());
+ }
+ boolean inRange=false;
+ for (Entry<Location, CountDownLatch> entry : entries) {
+ Location l = entry.getKey();
+ if( !inRange ) {
+ if( lastAck==null || lastAck.compareTo(l) < 0 ) {
+ inRange=true;
+ }
+ }
+ if( inRange ) {
+ entry.getValue().countDown();
+ if( newAck!=null && l.compareTo(newAck) <= 0 ) {
+ return;
+ }
+ }
+ }
+ }
+
class ReplicationSession implements Service, TransportListener {
private final Transport transport;
private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
+ private boolean stopped;
private File snapshotFile;
private HashSet<Integer> journalReplicatedFiles;
- private boolean online;
+ private Location lastAckLocation;
+ private Location lastUpdateLocation;
+ private boolean online;
public ReplicationSession(Transport transport) {
this.transport = transport;
}
- public void start() throws Exception {
+ synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
+ this.lastUpdateLocation = lastUpdateLocation;
+ }
+
+ public void start() throws Exception {
transport.setTransportListener(this);
transport.start();
}
- public void stop() throws Exception {
- deleteReplicationData();
- transport.stop();
+ synchronized public void stop() throws Exception {
+ if ( !stopped ) {
+ stopped=true;
+ deleteReplicationData();
+ transport.stop();
+ }
}
- public void onCommand(Object command) {
+ synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation location) {
+ Location ack = ReplicationSupport.convert(location);
+ if( online ) {
+ ackAllFromTo(lastAckLocation, ack);
+ }
+ lastAckLocation=ack;
+ }
+
+ synchronized private void onSlaveOnline(ReplicationFrame frame) {
+ deleteReplicationData();
+ online = true;
+ if( lastAckLocation!=null ) {
+ ackAllFromTo(null, lastAckLocation);
+ }
+
+ }
+
+ public void onCommand(Object command) {
try {
ReplicationFrame frame = (ReplicationFrame) command;
switch (frame.getHeader().getType()) {
@@ -193,11 +330,6 @@
public void transportResumed() {
}
- private void onSlaveOnline(ReplicationFrame frame) {
- online = true;
- deleteReplicationData();
- }
-
private void deleteReplicationData() {
if( snapshotFile!=null ) {
snapshotFile.delete();
@@ -229,7 +361,7 @@
}
- final KahaDBStore store = replicationServer.getStore();
+ final KahaDBStore store = replicationService.getStore();
store.checkpoint(new Callback() {
public void execute() throws Exception {
// This call back is executed once the checkpoint is
@@ -238,11 +370,14 @@
// that no updates are done while we are in this
// method.
- KahaDBStore store = replicationServer.getStore();
-
+ KahaDBStore store = replicationService.getStore();
+ if( lastAckLocation==null ) {
+ lastAckLocation = store.getLastUpdatePosition();
+ }
+
int snapshotId = nextSnapshotId.incrementAndGet();
File file = store.getPageFile().getFile();
- File dir = replicationServer.getTempReplicationDir();
+ File dir = replicationService.getTempReplicationDir();
dir.mkdirs();
snapshotFile = new File(dir, "snapshot-" + snapshotId);
@@ -294,7 +429,6 @@
}
rcPayload.setDeleteFilesList(deleteFiles);
-
updateJournalReplicatedFiles();
}
@@ -304,7 +438,7 @@
}
private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
- File file = replicationServer.getReplicationFile(fileInfo.getName());
+ File file = replicationService.getReplicationFile(fileInfo.getName());
long payloadSize = fileInfo.getEnd()-fileInfo.getStart();
if( file.length() < fileInfo.getStart()+payloadSize ) {
@@ -334,17 +468,20 @@
* it does not delete them while the replication is occuring.
*/
private void updateJournalReplicatedFiles() {
- HashSet<Integer> files = replicationServer.getStore().getJournalFilesBeingReplicated();
+ HashSet<Integer> files = replicationService.getStore().getJournalFilesBeingReplicated();
files.clear();
- for (ReplicationSession session : sessions) {
+
+ ArrayList<ReplicationSession> sessionsSnapshot;
+ synchronized (this.sessions) {
+ // Hurrah for copy on write..
+ sessionsSnapshot = this.sessions;
+ }
+
+ for (ReplicationSession session : sessionsSnapshot) {
if( session.journalReplicatedFiles !=null ) {
files.addAll(session.journalReplicatedFiles);
}
}
}
-
- private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation journalLocation) {
- }
-
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java Fri Nov 21 10:03:05 2008
@@ -47,6 +47,7 @@
private File tempReplicationDir;
private String uri;
private ClusterStateManager cluster;
+ private boolean asyncReplication=false;
private KahaDBStore store;
@@ -278,5 +279,14 @@
this.cluster = cluster;
}
+ public void setAsyncReplication(boolean asyncReplication) {
+ this.asyncReplication = asyncReplication;
+ }
+
+ public boolean isAsyncReplication() {
+ return asyncReplication;
+ }
+
+
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Fri Nov 21 10:03:05 2008
@@ -255,6 +255,20 @@
}
private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate update) throws IOException {
+
+ // Send an ack back once we get the ack.. yeah it's a little dirty to ack before it's on disk,
+ // but chances are low that both machines are going to loose power at the same time and this way,
+ // we reduce the latency the master sees from us.
+ if( update.getSendAck() ) {
+ ReplicationFrame ack = new ReplicationFrame();
+ ack.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE_ACK));
+ ack.setPayload(update.getLocation());
+ transport.oneway(ack);
+ }
+
+ // TODO: actually do the disk write in an async thread so that this thread can be
+ // start reading in the next journal updated.
+
boolean onlineRecovery=false;
PBJournalLocation location = update.getLocation();
byte[] data = update.getData().toByteArray();
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Fri Nov 21 10:03:05 2008
@@ -49,6 +49,7 @@
StaticClusterStateManager cluster = new StaticClusterStateManager();
ReplicationService rs1 = new ReplicationService();
+ rs1.setAsyncReplication(true);
rs1.setUri(BROKER1_REPLICATION_ID);
rs1.setCluster(cluster);
rs1.setDirectory(new File("target/replication-test/broker1"));
@@ -56,6 +57,7 @@
rs1.start();
ReplicationService rs2 = new ReplicationService();
+ rs2.setAsyncReplication(true);
rs2.setUri(BROKER2_REPLICATION_ID);
rs2.setCluster(cluster);
rs2.setDirectory(new File("target/replication-test/broker2"));
Modified: activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml (original)
+++ activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml Fri Nov 21 10:03:05 2008
@@ -32,7 +32,8 @@
<kahadb-replication
directory="target/kaha-data/broker1"
brokerURI="xbean:broker1/ha-broker.xml"
- uri="kdbr://localhost:6001">
+ uri="kdbr://localhost:6001"
+ asyncReplication="true">
<cluster>
<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
Modified: activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml (original)
+++ activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml Fri Nov 21 10:03:05 2008
@@ -32,7 +32,8 @@
<kahadb-replication
directory="target/kaha-data-broker2"
brokerURI="xbean:broker2/ha-broker.xml"
- uri="kdbr://localhost:6002">
+ uri="kdbr://localhost:6002"
+ asyncReplication="true">
<cluster>
<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>