You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/18 15:30:36 UTC
svn commit: r718615 - in /activemq/sandbox/kahadb: ./
src/main/java/org/apache/kahadb/replication/
src/main/java/org/apache/kahadb/store/
src/test/java/org/apache/kahadb/replication/
src/test/java/org/apache/kahadb/store/perf/
Author: chirino
Date: Tue Nov 18 06:30:35 2008
New Revision: 718615
URL: http://svn.apache.org/viewvc?rev=718615&view=rev
Log:
The ReplicationServer now uses a BrokerFactory to create and destroy broker instances. This allows us to bring a master back online after it has been taken offline.
Removed:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java
Modified:
activemq/sandbox/kahadb/pom.xml
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
Modified: activemq/sandbox/kahadb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/pom.xml?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/pom.xml (original)
+++ activemq/sandbox/kahadb/pom.xml Tue Nov 18 06:30:35 2008
@@ -89,6 +89,12 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>2.5.5</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.0.0</version>
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Tue Nov 18 06:30:35 2008
@@ -69,7 +69,7 @@
public void start() throws Exception {
synchronized (serverMutex) {
- server = TransportFactory.bind(new URI(replicationServer.getNodeId()));
+ server = TransportFactory.bind(new URI(replicationServer.getUri()));
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) {
try {
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java Tue Nov 18 06:30:35 2008
@@ -23,6 +23,9 @@
import java.util.zip.Checksum;
import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.page.PageFile;
@@ -41,32 +44,46 @@
private KahaDBStore store;
- private ReplicatedBrokerService brokerService;
+ private BrokerService brokerService;
- public ReplicationServer() {
- }
-
- public ReplicatedBrokerService getBrokerService() {
- return brokerService;
- }
+ private File directory = new File(IOHelper.getDefaultDataDirectory());
- public void setBrokerService(ReplicatedBrokerService brokerService) {
- this.brokerService = brokerService;
+ public ReplicationServer() {
}
public KahaDBStore getStore() {
+ if( store == null ) {
+ store = new KahaDBStore();
+ store.setDirectory(directory);
+ }
return store;
}
- public void setStore(KahaDBStore store) {
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public String getBrokerURI() {
+ return brokerURI;
+ }
+
+ public void setBrokerURI(String brokerURI) {
+ this.brokerURI = brokerURI;
+ }
+
+ public void setStore(KahaDBStore store) {
this.store = store;
}
- public String getNodeId() {
- return nodeId;
+ public String getUri() {
+ return uri;
}
- public void setNodeId(String nodeId) {
- this.nodeId = nodeId;
+ public void setUri(String nodeId) {
+ this.uri = nodeId;
}
public ClusterStateManager getCluster() {
@@ -78,7 +95,7 @@
}
PageFile pageFile;
- String nodeId;
+ String uri;
ClusterStateManager cluster;
ReplicationMaster master;
@@ -88,9 +105,12 @@
private File tempReplicationDir;
+ private String brokerURI = "xbean:broker.xml";
+
public void start() throws Exception {
// The cluster will let us know about the cluster configuration,
// which lets us decide if we are going to be a slave or a master.
+ getStore().open();
cluster.addListener(this);
cluster.start();
}
@@ -98,6 +118,7 @@
public void stop() throws Exception {
cluster.removeListener(this);
cluster.stop();
+ getStore().close();
}
public void onClusterChange(ClusterState clusterState) {
@@ -110,10 +131,11 @@
LOG.info("Shutting down master due to cluster state change.");
master.stop();
master = null;
- // TODO: broker service does not support getting restarted once it's been stopped. :(
- // so at this point we need, to re-create the broker if we want to go back into slave
- // mode.
- brokerService.stopMaster();
+ brokerService.stop();
+ brokerService=null;
+ // Stopping the broker service actually stops the store too..
+ // so we need to open it back up.
+ getStore().open();
}
// If the slave service was not yet started.. start it up.
if (slave == null) {
@@ -132,9 +154,10 @@
// If the master service was not yet started.. start it up.
if (master == null) {
LOG.info("Starting Master.");
+ brokerService = createBrokerService();
+ brokerService.start();
master = new ReplicationMaster(this);
master.start();
- brokerService.startMaster();
}
master.onClusterChange(clusterState);
@@ -159,16 +182,26 @@
}
}
- public ClusterState getClusterState() {
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
+
+ private BrokerService createBrokerService() throws Exception {
+ BrokerService rc = BrokerFactory.createBroker(brokerURI );
+ rc.setPersistenceAdapter(getStore());
+ return rc;
+ }
+
+ public ClusterState getClusterState() {
return clusterState;
}
private boolean areWeTheSlave(ClusterState config) {
- return config.getSlaves().contains(nodeId);
+ return config.getSlaves().contains(uri);
}
private boolean areWeTheMaster(ClusterState config) {
- return nodeId.equals(config.getMaster());
+ return uri.equals(config.getMaster());
}
public File getReplicationFile(String fn) throws IOException {
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Tue Nov 18 06:30:35 2008
@@ -99,8 +99,6 @@
return;
}
- replicationServer.getStore().open();
-
transport = TransportFactory.connect(new URI(master));
transport.setTransportListener(this);
transport.start();
@@ -111,7 +109,7 @@
ReplicationFrame frame = new ReplicationFrame();
frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
PBSlaveInit payload = new PBSlaveInit();
- payload.setNodeId(replicationServer.getNodeId());
+ payload.setNodeId(replicationServer.getUri());
// This call back is executed once the checkpoint is
// completed and all data has been
@@ -199,8 +197,6 @@
journalUpateFile=null;
}
journalUpdateFileId=0;
-
- replicationServer.getStore().close();
}
}
@@ -305,7 +301,6 @@
synchronized (transferMutex) {
LOG.info("Slave synhcronization complete, going online...");
-
replicationServer.getStore().close();
if( journalUpateFile!=null ) {
@@ -332,7 +327,6 @@
online=true;
replicationServer.getStore().open();
-
LOG.info("Slave is now online. We are now eligible to become the master.");
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Tue Nov 18 06:30:35 2008
@@ -301,15 +301,17 @@
public void unload() throws IOException, InterruptedException {
synchronized (indexMutex) {
- metadata.state = CLOSED_STATE;
- metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
-
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- tx.store(metadata.page, metadataMarshaller, true);
- }
- });
- close();
+ if( pageFile.isLoaded() ) {
+ metadata.state = CLOSED_STATE;
+ metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
+
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ tx.store(metadata.page, metadataMarshaller, true);
+ }
+ });
+ close();
+ }
}
}
@@ -462,7 +464,11 @@
public JournalCommand load(Location location) throws IOException {
ByteSequence data = journal.read(location);
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
- KahaEntryType type = KahaEntryType.valueOf(is.readByte());
+ byte readByte = is.readByte();
+ KahaEntryType type = KahaEntryType.valueOf(readByte);
+ if( type == null ) {
+ throw new IOException("Could not load journal record. Invalid location: "+location);
+ }
JournalCommand message = (JournalCommand)type.createMessage();
message.mergeFramed(is);
return message;
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Tue Nov 18 06:30:35 2008
@@ -16,6 +16,7 @@
*/
package org.apache.kahadb.replication;
+import java.io.File;
import java.util.Arrays;
import javax.jms.Connection;
@@ -47,21 +48,19 @@
// This cluster object will control who becomes the master.
StaticClusterStateManager cluster = new StaticClusterStateManager();
- ReplicatedBrokerService b1 = new ReplicatedBrokerService();
- b1.addConnector(BROKER1_URI);
- b1.setDataDirectory("target/replication-test/broker1");
- b1.setBrokerName("broker1");
- b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
- b1.getReplicationServer().setCluster(cluster);
- b1.start();
-
- ReplicatedBrokerService b2 = new ReplicatedBrokerService();
- b2.addConnector(BROKER2_URI);
- b2.setDataDirectory("target/replication-test/broker2");
- b2.setBrokerName("broker2");
- b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
- b2.getReplicationServer().setCluster(cluster);
- b2.start();
+ ReplicationServer rs1 = new ReplicationServer();
+ rs1.setUri(BROKER1_REPLICATION_ID);
+ rs1.setCluster(cluster);
+ rs1.setDirectory(new File("target/replication-test/broker1"));
+ rs1.setBrokerURI("broker://("+BROKER1_URI+")/broker1");
+ rs1.start();
+
+ ReplicationServer rs2 = new ReplicationServer();
+ rs2.setUri(BROKER2_REPLICATION_ID);
+ rs2.setCluster(cluster);
+ rs2.setDirectory(new File("target/replication-test/broker2"));
+ rs2.setBrokerURI("broker://(" + BROKER2_URI + ")/broker2");
+ rs2.start();
// // None of the brokers should be accepting connections since they are not masters.
// try {
@@ -108,8 +107,8 @@
assertReceived(200, BROKER2_URI);
- b2.stop();
- b1.stop();
+ rs2.stop();
+ rs1.stop();
}
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java Tue Nov 18 06:30:35 2008
@@ -16,12 +16,13 @@
*/
package org.apache.kahadb.store.perf;
+import java.io.File;
import java.util.Arrays;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.SimpleQueueTest;
import org.apache.kahadb.replication.ClusterState;
-import org.apache.kahadb.replication.ReplicatedBrokerService;
+import org.apache.kahadb.replication.ReplicationServer;
import org.apache.kahadb.replication.StaticClusterStateManager;
/**
@@ -30,13 +31,13 @@
public class ReplicatedKahaStoreQueueTest extends SimpleQueueTest {
private StaticClusterStateManager cluster;
- private ReplicatedBrokerService b1;
- private ReplicatedBrokerService b2;
private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
protected String broker2BindAddress="tcp://localhost:61617";
+ private ReplicationServer rs1;
+ private ReplicationServer rs2;
@Override
protected BrokerService createBroker(String uri) throws Exception {
@@ -56,37 +57,33 @@
clusterState.setSlaves(Arrays.asList(slaves));
cluster.setClusterState(clusterState);
- b1 = new ReplicatedBrokerService();
- b1.setDeleteAllMessagesOnStartup(true);
- b1.addConnector(uri);
- b1.setUseShutdownHook(false);
-
- b1.setDataDirectory("target/test-amq-data/perfTest-b1/amqdb");
- b1.setBrokerName("broker1");
- b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
- b1.getReplicationServer().setCluster(cluster);
- b1.start();
-
- Thread.sleep(1000);
-
- b2 = new ReplicatedBrokerService();
- b2.addConnector(broker2BindAddress);
- b2.setDataDirectory("target/test-amq-data/perfTest-b2/amqdb");
- b2.setBrokerName("broker1");
- b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
- b2.getReplicationServer().setCluster(cluster);
- b2.start();
+ rs1 = new ReplicationServer();
+ rs1.setUri(BROKER1_REPLICATION_ID);
+ rs1.setCluster(cluster);
+ rs1.setDirectory(new File("target/replication-test/broker1"));
+ rs1.setBrokerURI("broker://("+uri+")/broker1");
+ rs1.start();
+
+ rs2 = new ReplicationServer();
+ rs2.setUri(BROKER2_REPLICATION_ID);
+ rs2.setCluster(cluster);
+ rs2.setDirectory(new File("target/replication-test/broker2"));
+ rs2.setBrokerURI("broker://(" + broker2BindAddress + ")/broker2");
+ rs2.start();
-
- return b1;
+ return rs1.getBrokerService();
}
@Override
protected void tearDown() throws Exception {
- if( b2!=null ) {
- b2.stop();
- b2 = null;
+ if( rs1!=null ) {
+ rs1.stop();
+ rs1 = null;
}
+ if( rs2!=null ) {
+ rs2.stop();
+ rs2 = null;
+ }
}
}