You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/18 15:30:36 UTC

svn commit: r718615 - in /activemq/sandbox/kahadb: ./ src/main/java/org/apache/kahadb/replication/ src/main/java/org/apache/kahadb/store/ src/test/java/org/apache/kahadb/replication/ src/test/java/org/apache/kahadb/store/perf/

Author: chirino
Date: Tue Nov 18 06:30:35 2008
New Revision: 718615

URL: http://svn.apache.org/viewvc?rev=718615&view=rev
Log:
The ReplicationServer now uses a BrokerFactory to create and destroy broker instances.  This allows us to bring a master back online after it has been taken offline.

Removed:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java
Modified:
    activemq/sandbox/kahadb/pom.xml
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java

Modified: activemq/sandbox/kahadb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/pom.xml?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/pom.xml (original)
+++ activemq/sandbox/kahadb/pom.xml Tue Nov 18 06:30:35 2008
@@ -89,6 +89,12 @@
       <optional>true</optional>
     </dependency>
     <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-context</artifactId>
+      <version>2.5.5</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.0.0</version>

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Tue Nov 18 06:30:35 2008
@@ -69,7 +69,7 @@
 
 	public void start() throws Exception {
 		synchronized (serverMutex) {
-			server = TransportFactory.bind(new URI(replicationServer.getNodeId()));
+			server = TransportFactory.bind(new URI(replicationServer.getUri()));
 			server.setAcceptListener(new TransportAcceptListener() {
 				public void onAccept(Transport transport) {
 					try {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java Tue Nov 18 06:30:35 2008
@@ -23,6 +23,9 @@
 import java.util.zip.Checksum;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.page.PageFile;
@@ -41,32 +44,46 @@
 
     private KahaDBStore store;
 
-	private ReplicatedBrokerService brokerService;
+	private BrokerService brokerService;
 
-	public ReplicationServer() {
-	}
-
-	public ReplicatedBrokerService getBrokerService() {
-		return brokerService;
-	}
+    private File directory = new File(IOHelper.getDefaultDataDirectory());
 
-	public void setBrokerService(ReplicatedBrokerService brokerService) {
-		this.brokerService = brokerService;
+	public ReplicationServer() {
 	}
 
 	public KahaDBStore getStore() {
+	    if( store == null ) {
+	        store = new KahaDBStore();
+	        store.setDirectory(directory);
+	    }
 		return store;
 	}
-	public void setStore(KahaDBStore store) {
+	public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public String getBrokerURI() {
+        return brokerURI;
+    }
+
+    public void setBrokerURI(String brokerURI) {
+        this.brokerURI = brokerURI;
+    }
+
+    public void setStore(KahaDBStore store) {
 		this.store = store;
 	}
 
-	public String getNodeId() {
-		return nodeId;
+	public String getUri() {
+		return uri;
 	}
 
-	public void setNodeId(String nodeId) {
-		this.nodeId = nodeId;
+	public void setUri(String nodeId) {
+		this.uri = nodeId;
 	}
 
 	public ClusterStateManager getCluster() {
@@ -78,7 +95,7 @@
 	}
 
 	PageFile pageFile;
-	String nodeId;
+	String uri;
 	ClusterStateManager cluster;
 
 	ReplicationMaster master;
@@ -88,9 +105,12 @@
 
 	private File tempReplicationDir;
 
+    private String brokerURI = "xbean:broker.xml";
+
 	public void start() throws Exception {
 		// The cluster will let us know about the cluster configuration,
 		// which lets us decide if we are going to be a slave or a master.
+        getStore().open();
 		cluster.addListener(this);
 		cluster.start();
 	}
@@ -98,6 +118,7 @@
 	public void stop() throws Exception {
 		cluster.removeListener(this);
 		cluster.stop();
+		getStore().close();
 	}
 
 	public void onClusterChange(ClusterState clusterState) {
@@ -110,10 +131,11 @@
 						LOG.info("Shutting down master due to cluster state change.");
 						master.stop();
 						master = null;
-						// TODO: broker service does not support getting restarted once it's been stopped. :(
-						// so at this point we need, to re-create the broker if we want to go back into slave 
-						// mode.
-						brokerService.stopMaster();
+						brokerService.stop();
+						brokerService=null;
+						// Stopping the broker service actually stops the store too..
+						// so we need to open it back up.
+						getStore().open();
 					}
 					// If the slave service was not yet started.. start it up.
 					if (slave == null) {
@@ -132,9 +154,10 @@
 					// If the master service was not yet started.. start it up.
 					if (master == null) {
 						LOG.info("Starting Master.");
+						brokerService = createBrokerService();
+                        brokerService.start();
 						master = new ReplicationMaster(this);
 						master.start();
-						brokerService.startMaster();
 					}
 					
 					master.onClusterChange(clusterState);					
@@ -159,16 +182,26 @@
 		}
 	}
 
-	public ClusterState getClusterState() {
+	public BrokerService getBrokerService() {
+        return brokerService;
+    }
+
+    private BrokerService createBrokerService() throws Exception {
+	    BrokerService rc = BrokerFactory.createBroker(brokerURI );
+	    rc.setPersistenceAdapter(getStore());
+	    return rc;
+    }
+
+    public ClusterState getClusterState() {	    
 		return clusterState;
 	}
 
 	private boolean areWeTheSlave(ClusterState config) {
-		return config.getSlaves().contains(nodeId);
+		return config.getSlaves().contains(uri);
 	}
 	
 	private boolean areWeTheMaster(ClusterState config) {
-		return nodeId.equals(config.getMaster());
+		return uri.equals(config.getMaster());
 	}
 
 	public File getReplicationFile(String fn) throws IOException {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Tue Nov 18 06:30:35 2008
@@ -99,8 +99,6 @@
 				return;
 			}
 			
-			replicationServer.getStore().open();
-			
 			transport = TransportFactory.connect(new URI(master));
 			transport.setTransportListener(this);
 			transport.start();
@@ -111,7 +109,7 @@
 			ReplicationFrame frame = new ReplicationFrame();
 			frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
 			PBSlaveInit payload = new PBSlaveInit();
-			payload.setNodeId(replicationServer.getNodeId());
+			payload.setNodeId(replicationServer.getUri());
 			
 			// This call back is executed once the checkpoint is
 			// completed and all data has been
@@ -199,8 +197,6 @@
 				journalUpateFile=null;
 			}
 			journalUpdateFileId=0;
-			
-			replicationServer.getStore().close();
 		}
 	}
 
@@ -305,7 +301,6 @@
 			synchronized (transferMutex) {
 				
 				LOG.info("Slave synhcronization complete, going online...");
-
 				replicationServer.getStore().close();
 				
 				if( journalUpateFile!=null ) {
@@ -332,7 +327,6 @@
 				online=true;
 				
 				replicationServer.getStore().open();
-				
 				LOG.info("Slave is now online.  We are now eligible to become the master.");
 			}
 			

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Tue Nov 18 06:30:35 2008
@@ -301,15 +301,17 @@
 	
     public void unload() throws IOException, InterruptedException {
         synchronized (indexMutex) {
-            metadata.state = CLOSED_STATE;
-            metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
-
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    tx.store(metadata.page, metadataMarshaller, true);
-                }
-            });
-            close();
+            if( pageFile.isLoaded() ) {
+                metadata.state = CLOSED_STATE;
+                metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
+    
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        tx.store(metadata.page, metadataMarshaller, true);
+                    }
+                });
+                close();
+            }
         }
     }
 
@@ -462,7 +464,11 @@
     public JournalCommand load(Location location) throws IOException {
         ByteSequence data = journal.read(location);
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
-        KahaEntryType type = KahaEntryType.valueOf(is.readByte());
+        byte readByte = is.readByte();
+        KahaEntryType type = KahaEntryType.valueOf(readByte);
+        if( type == null ) {
+            throw new IOException("Could not load journal record. Invalid location: "+location);
+        }
         JournalCommand message = (JournalCommand)type.createMessage();
         message.mergeFramed(is);
         return message;

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Tue Nov 18 06:30:35 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.kahadb.replication;
 
+import java.io.File;
 import java.util.Arrays;
 
 import javax.jms.Connection;
@@ -47,21 +48,19 @@
 		// This cluster object will control who becomes the master.
 		StaticClusterStateManager cluster = new StaticClusterStateManager();
 		
-		ReplicatedBrokerService b1 = new ReplicatedBrokerService();
-		b1.addConnector(BROKER1_URI);
-		b1.setDataDirectory("target/replication-test/broker1");
-		b1.setBrokerName("broker1");
-		b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
-		b1.getReplicationServer().setCluster(cluster);
-		b1.start();
-		
-		ReplicatedBrokerService b2 = new ReplicatedBrokerService();
-		b2.addConnector(BROKER2_URI);
-		b2.setDataDirectory("target/replication-test/broker2");
-		b2.setBrokerName("broker2");
-		b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
-		b2.getReplicationServer().setCluster(cluster);
-		b2.start();
+		ReplicationServer rs1 = new ReplicationServer();
+		rs1.setUri(BROKER1_REPLICATION_ID);
+		rs1.setCluster(cluster);
+		rs1.setDirectory(new File("target/replication-test/broker1"));
+		rs1.setBrokerURI("broker://("+BROKER1_URI+")/broker1");
+		rs1.start();
+
+        ReplicationServer rs2 = new ReplicationServer();
+        rs2.setUri(BROKER2_REPLICATION_ID);
+        rs2.setCluster(cluster);
+        rs2.setDirectory(new File("target/replication-test/broker2"));
+        rs2.setBrokerURI("broker://(" + BROKER2_URI + ")/broker2");
+        rs2.start();
 		
 //		// None of the brokers should be accepting connections since they are not masters.
 //		try {
@@ -108,8 +107,8 @@
 		
 		assertReceived(200, BROKER2_URI);
 		
-		b2.stop();		
-		b1.stop();
+		rs2.stop();		
+		rs1.stop();
 		
 	}
 

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java Tue Nov 18 06:30:35 2008
@@ -16,12 +16,13 @@
  */
 package org.apache.kahadb.store.perf;
 
+import java.io.File;
 import java.util.Arrays;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.perf.SimpleQueueTest;
 import org.apache.kahadb.replication.ClusterState;
-import org.apache.kahadb.replication.ReplicatedBrokerService;
+import org.apache.kahadb.replication.ReplicationServer;
 import org.apache.kahadb.replication.StaticClusterStateManager;
 
 /**
@@ -30,13 +31,13 @@
 public class ReplicatedKahaStoreQueueTest extends SimpleQueueTest {
 
 	private StaticClusterStateManager cluster;
-	private ReplicatedBrokerService b1;
-	private ReplicatedBrokerService b2;
 	
 	private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
 	private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
 
     protected String broker2BindAddress="tcp://localhost:61617";
+    private ReplicationServer rs1;
+    private ReplicationServer rs2;
 
 	@Override
 	protected BrokerService createBroker(String uri) throws Exception {
@@ -56,37 +57,33 @@
 		clusterState.setSlaves(Arrays.asList(slaves));
 		cluster.setClusterState(clusterState);
 
-		b1 = new ReplicatedBrokerService();
-        b1.setDeleteAllMessagesOnStartup(true);
-        b1.addConnector(uri);
-        b1.setUseShutdownHook(false);
-
-		b1.setDataDirectory("target/test-amq-data/perfTest-b1/amqdb");
-		b1.setBrokerName("broker1");
-		b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
-		b1.getReplicationServer().setCluster(cluster);
-		b1.start();
-		
-		Thread.sleep(1000);
-		
-		b2 = new ReplicatedBrokerService();
-		b2.addConnector(broker2BindAddress);
-		b2.setDataDirectory("target/test-amq-data/perfTest-b2/amqdb");
-		b2.setBrokerName("broker1");
-		b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
-		b2.getReplicationServer().setCluster(cluster);
-		b2.start();
+        rs1 = new ReplicationServer();
+        rs1.setUri(BROKER1_REPLICATION_ID);
+        rs1.setCluster(cluster);
+        rs1.setDirectory(new File("target/replication-test/broker1"));
+        rs1.setBrokerURI("broker://("+uri+")/broker1");
+        rs1.start();
+
+        rs2 = new ReplicationServer();
+        rs2.setUri(BROKER2_REPLICATION_ID);
+        rs2.setCluster(cluster);
+        rs2.setDirectory(new File("target/replication-test/broker2"));
+        rs2.setBrokerURI("broker://(" + broker2BindAddress + ")/broker2");
+        rs2.start();
 
-		
-		return b1;
+		return rs1.getBrokerService();
 	}
 	
 	@Override
 	protected void tearDown() throws Exception {
-		if( b2!=null ) {
-			b2.stop();
-			b2 = null;
+		if( rs1!=null ) {
+			rs1.stop();
+			rs1 = null;
 		}
+        if( rs2!=null ) {
+            rs2.stop();
+            rs2 = null;
+        }
 	}
 	
 }