You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/07 19:16:45 UTC

svn commit: r712217 [2/2] - in /activemq/sandbox/kahadb: ./ src/main/java/org/apache/kahadb/ha/ src/main/java/org/apache/kahadb/ha/command/ src/main/java/org/apache/kahadb/journal/ src/main/java/org/apache/kahadb/page/ src/main/java/org/apache/kahadb/r...

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Fri Nov  7 10:16:10 2008
@@ -28,9 +28,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.TreeMap;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,20 +38,19 @@
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.util.Callback;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.LongMarshaller;
 import org.apache.kahadb.Marshaller;
 import org.apache.kahadb.StringMarshaller;
 import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.index.BTreeNode;
 import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.page.Transaction.PageOverflowIOException;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
 import org.apache.kahadb.store.data.KahaDestination;
@@ -70,8 +67,6 @@
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
 
 public class MessageDatabase {
 
@@ -140,7 +135,7 @@
     }
 
     protected PageFile pageFile;
-    protected Journal asyncDataManager;
+	protected Journal journal;
     protected Metadata metadata = new Metadata();
 
     protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -175,15 +170,15 @@
     public void load() throws IOException {
 
         recovering=true;
-        if (asyncDataManager == null) {
-            asyncDataManager = createAsyncDataManager();
-        }
+        
+        // Creates the journal if it does not yet exist.
+        getJournal();
         if (failIfJournalIsLocked) {
-            asyncDataManager.lock();
+            journal.lock();
         } else {
             while (true) {
                 try {
-                    asyncDataManager.lock();
+                    journal.lock();
                     break;
                 } catch (IOException e) {
                     LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
@@ -194,16 +189,14 @@
                 }
             }
         }
-        if (pageFile == null) {
-            pageFile = createPageFile();
-        }
+        
+        // Creates the page file if it does not yet exist.
+        getPageFile();
 
-        asyncDataManager.start();
+        journal.start();
         if (deleteAllMessages) {
             pageFile.delete();
-            asyncDataManager.delete();
-
-            store(new KahaTraceCommand().setMessage("DELETED " + new Date()));
+            journal.delete();
 
             LOG.info("Persistence store purged.");
             deleteAllMessages = false;
@@ -269,11 +262,11 @@
                         Thread.sleep(sleepTime);
                         long now = System.currentTimeMillis();
                         if( now - lastCleanup >= cleanupInterval ) {
-                            checkpoint(true);
+                            checkpointCleanup(true);
                             lastCleanup = now;
                             lastCheckpoint = now;
                         } else if( now - lastCheckpoint >= checkpointInterval ) {
-                            checkpoint(false);
+                            checkpointCleanup(false);
                             lastCheckpoint = now;
                         }
                     }
@@ -305,7 +298,7 @@
             metadata = new Metadata();
         }
         store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new Date()));
-        asyncDataManager.close();
+        journal.close();
     }
 
     /**
@@ -347,7 +340,7 @@
         // Perhaps there were no transactions...
         if( pos==null && metadata.lastUpdate!=null) {
             // Start replay at the record after the last one recorded in the index file.
-            pos = asyncDataManager.getNextLocation(metadata.lastUpdate);
+            pos = journal.getNextLocation(metadata.lastUpdate);
             // No journal records need to be recovered.
             if( pos == null ) {
                 return;
@@ -357,17 +350,17 @@
         // Do we need to start from the begining?
         if (pos == null) {
             // This loads the first position.
-            pos = asyncDataManager.getNextLocation(null);
+            pos = journal.getNextLocation(null);
         }
 
         int redoCounter = 0;
-        LOG.info("Journal Recovery Started from: " + asyncDataManager + " at " + pos.getDataFileId() + ":" + pos.getOffset());
+        LOG.info("Journal Recovery Started from: " + journal + " at " + pos.getDataFileId() + ":" + pos.getOffset());
 
         while (pos != null) {
             JournalCommand message = load(pos);
             process(message, pos);
             redoCounter++;
-            pos = asyncDataManager.getNextLocation(pos);
+            pos = journal.getNextLocation(pos);
         }
 
         Location location = store(new KahaTraceCommand().setMessage("RECOVERED " + new Date()), true);
@@ -375,7 +368,7 @@
         LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
     }
 
-    private void checkpoint(final boolean cleanup) {
+    protected void checkpointCleanup(final boolean cleanup) {
         try {
             synchronized (indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -390,6 +383,23 @@
         }
     }
 
+    
+	public void checkpoint(Callback closure) throws Exception {
+        try {
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        checkpointUpdate(tx, false);
+                    }
+                });
+                pageFile.flush();
+                closure.execute();
+            }
+            store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
+        } catch (IOException e) {
+        }
+	}
+
     // /////////////////////////////////////////////////////////////////
     // Methods call by the broker to update and query the store.
     // /////////////////////////////////////////////////////////////////
@@ -408,7 +418,7 @@
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
         os.writeByte(data.type().getNumber());
         data.writeFramed(os);
-        Location location = asyncDataManager.write(os.toByteSequence(), sync);
+        Location location = journal.write(os.toByteSequence(), sync);
         process(data, location);
         if( !recovering ) {
             metadata.lastUpdate = location;
@@ -424,7 +434,7 @@
      * @throws IOException
      */
     public JournalCommand load(Location location) throws IOException {
-        ByteSequence data = asyncDataManager.read(location);
+        ByteSequence data = journal.read(location);
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
         KahaEntryType type = KahaEntryType.valueOf(is.readByte());
         JournalCommand message = (JournalCommand)type.createMessage();
@@ -742,7 +752,7 @@
             }
             
             LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
-            asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
+            journal.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
         }
         
         LOG.debug("Checkpoint done.");
@@ -1110,7 +1120,7 @@
         return pf;
     }
 
-    private Journal createAsyncDataManager() {
+    private Journal createJournal() {
         Journal manager = new Journal();
         manager.setDirectory(new File(directory, "journal"));
         manager.setMaxFileLength(1024 * 1024 * 20);
@@ -1158,4 +1168,17 @@
         this.cleanupInterval = cleanupInterval;
     }
 
+    public PageFile getPageFile() {
+        if (pageFile == null) {
+            pageFile = createPageFile();
+        }
+		return pageFile;
+	}
+
+	public Journal getJournal() {
+        if (journal == null) {
+            journal = createJournal();
+        }
+		return journal;
+	}
 }

Added: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (added)
+++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Fri Nov  7 10:16:10 2008
@@ -0,0 +1,99 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.kahadb.replication.pb;
+
+option java_multiple_files = true;
+option java_outer_classname = "PB";
+
+//
+// 
+//
+message PBHeader {
+    required PBType type=1;
+    optional int64 payload_size=2;    
+}
+
+enum PBType {
+
+	// Sent from the slave to the master when the slave first starts.  It lets the master
+	// know about the slave's synchronization state.  This allows the master decide how to best synchronize
+	// the slave.
+	//    
+	// @followed-by PBSlaveInit	
+	SLAVE_INIT = 0;
+	
+	// The Master will send this response back to the slave, letting it know what it needs to do to get
+	// it's data files synchronized with the master.
+	//    
+	// @followed-by PBSlaveInitResponse	
+	SLAVE_INIT_RESPONSE = 1;
+  
+	// Sent from the Master to the slave to replicate a Journal update.
+	//    
+	// @followed-by PBJournalUpdate	
+	JOURNAL_UPDATE=3;	
+	
+	// An ack sent from the Slave to a master to let the master know up to where in the journal the slave has
+	// synchronized to.  This acknowledges receipt of all previous journal records.  This should not be sent until
+	// all bulk file copies are complete.
+	//    
+	// @followed-by PBJournalLocation	
+	JOURNAL_UPDATE_ACK=4;
+	
+	// A Request for a bulk file transfer.  Sent from a slave to a Master
+	//    
+	// @followed-by PBFileInfo	
+	FILE_TRANSFER=5;
+
+	// A bulk file transfer response
+	//    
+	// @followed-by the bytes of the requested file. 	
+	FILE_TRANSFER_RESPONSE=6;
+}
+
+message PBFileInfo {
+    required string name=1;
+    optional int32 snapshot_id=2;
+    optional sfixed64 checksum=3;    
+    optional int64 start=4;    
+    optional int64 end=5;    
+}
+
+message PBJournalLocation {
+	required int32 file_id=1;
+	required int32 offset=2; 
+}
+
+message PBSlaveInit {
+    // The id of the slave node that is being initialized
+    required string node_id=1;
+    // The files that the slave node currently has
+	repeated PBFileInfo current_files=2;	
+}
+
+message PBSlaveInitResponse {
+	// The files that the slave should bulk copy from the master..
+	repeated PBFileInfo copy_files=1;
+	// The files that the slave should delete
+	repeated string delete_files=2;
+}
+
+message PBJournalUpdate {
+    required PBJournalLocation location=1;
+    required bytes data=2;
+}
+

Added: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr (added)
+++ activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr Fri Nov  7 10:16:10 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRTransportFactory

Added: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr (added)
+++ activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr Fri Nov  7 10:16:10 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRWireFormatFactory

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ReplicationTest extends TestCase {
+
+	
+	private static final String BROKER1_URI = "tcp://localhost:61001";
+	private static final String BROKER2_URI = "tcp://localhost:61002";
+
+	private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
+	private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
+	
+	private Destination destination = new ActiveMQQueue("TEST_QUEUE");
+
+	public void testReplication() throws Exception {
+		
+		// This cluster object will control who becomes the master.
+		StaticClusterStateManager cluster = new StaticClusterStateManager();
+		
+		ReplicatedBrokerService b1 = new ReplicatedBrokerService();
+		b1.addConnector(BROKER1_URI);
+		b1.setDataDirectory("target/replication-test/broker1");
+		b1.setBrokerName("broker1");
+		b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
+		b1.getReplicationServer().setCluster(cluster);
+		b1.start();
+		
+		ReplicatedBrokerService b2 = new ReplicatedBrokerService();
+		b2.addConnector(BROKER2_URI);
+		b2.setDataDirectory("target/replication-test/broker2");
+		b2.setBrokerName("broker2");
+		b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
+		b2.getReplicationServer().setCluster(cluster);
+		b2.start();
+		
+//		// None of the brokers should be accepting connections since they are not masters.
+//		try {
+//			sendMesagesTo(1, BROKER1_URI);
+//			fail("Connection failure expected.");
+//		} catch( JMSException e ) {
+//		}
+		
+		// Make b1 the master.
+		ClusterState clusterState = new ClusterState();
+		clusterState.setMaster(BROKER1_REPLICATION_ID);
+		cluster.setClusterState(clusterState);
+		
+		try {
+			sendMesagesTo(500, BROKER1_URI);
+		} catch( JMSException e ) {
+			fail("b1 did not become a master.");
+		}
+		
+		// Make broker 2 a salve.
+		clusterState = new ClusterState();
+		clusterState.setMaster(BROKER1_REPLICATION_ID);
+		String[] slaves = {BROKER2_REPLICATION_ID};
+		clusterState.setSlaves(Arrays.asList(slaves));
+		cluster.setClusterState(clusterState);
+		
+		Thread.sleep(10000);
+		
+		b2.stop();		
+		b1.stop();
+		
+	}
+
+	private void sendMesagesTo(int count, String brokerUri) throws JMSException {
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+		Connection con = cf.createConnection();
+		try {
+			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageProducer producer = session.createProducer(destination);
+			for (int i = 0; i < count; i++) {
+				producer.send(session.createTextMessage("Hello: "+i));
+			}
+		} finally {
+			try { con.close(); } catch (Throwable e) {}
+		}
+	}
+	
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.kahadb.replication.ReplicationFrame;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBType;
+
+public class KDBRTransportTest extends TestCase {
+
+	private static final String KDBR_URI = "kdbr://localhost:61618";
+	private List<Object> serverQueue;
+	private List<Object> clientQueue;
+	private List<Transport> serverTransports;
+	private TransportServer server;
+	private Transport client;
+	
+	private Object commandLatchMutex = new Object();
+	private CountDownLatch commandLatch;
+	
+	protected void releaseCommandLatch() {
+		synchronized( commandLatchMutex ) {
+			if( commandLatch == null ) {
+				return;
+			}
+			commandLatch.countDown();
+			commandLatch=null;
+		}
+	}
+	
+	protected CountDownLatch getCommandLatch() {
+		synchronized( commandLatchMutex ) {
+			if( commandLatch == null ) {
+				commandLatch = new CountDownLatch(1);
+			}
+			return commandLatch;
+		}
+	}
+	
+	@Override
+	protected void setUp() throws Exception {
+		serverQueue = Collections.synchronizedList(new ArrayList<Object>()); 
+		clientQueue = Collections.synchronizedList(new ArrayList<Object>()); 
+		serverTransports = Collections.synchronizedList(new ArrayList<Transport>()); 
+		
+		// Setup a server
+		server = TransportFactory.bind(new URI(KDBR_URI));
+		server.setAcceptListener(new TransportAcceptListener() {
+			public void onAccept(Transport transport) {
+				try {
+					transport.setTransportListener(new TransportListener() {
+						public void onCommand(Object command) {
+							try {
+								serverQueue.add(command);
+								process(command);
+								releaseCommandLatch();
+							} catch (IOException e) {
+								onException(e);
+							}
+						}
+
+						public void onException(IOException error) {
+							serverQueue.add(error);
+							serverTransports.remove(this);
+							releaseCommandLatch();
+						}
+
+						public void transportInterupted() {
+						}
+
+						public void transportResumed() {
+						}
+					});
+					transport.start();
+					serverTransports.add(transport);
+				} catch (Exception e) {
+					onAcceptError(e);
+				}
+			}
+
+			public void onAcceptError(Exception error) {
+				error.printStackTrace();
+			}
+		});
+		server.start();
+
+		// Connect a client.
+		client = TransportFactory.connect(new URI(KDBR_URI));
+		client.setTransportListener(new TransportListener() {
+			public void onCommand(Object command) {
+				clientQueue.add(command);
+				releaseCommandLatch();
+			}
+
+			public void onException(IOException error) {
+				clientQueue.add(error);
+				releaseCommandLatch();
+			}
+
+			public void transportInterupted() {
+			}
+
+			public void transportResumed() {
+			}
+		});
+		client.start();	
+	}
+	
+	@Override
+	protected void tearDown() throws Exception {
+		client.stop();
+		server.stop();
+	}
+
+	private void process(Object command) throws IOException {		
+		ReplicationFrame frame = (ReplicationFrame) command;
+		// Since we are processing the commands async in this test case we need to full read the stream before
+		// returning since will be be used to read the next command once we return.
+		if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) {
+			InputStream ais = (InputStream) frame.getPayload();
+			byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()];
+			readFully(ais, actualPayload);
+			frame.setPayload(actualPayload);
+		}
+	}
+	
+	/**
+	 * Test a frame that has a streaming payload.
+	 * 
+	 * @throws Exception
+	 */
+	public void testFileTransferResponse() throws Exception {
+
+		byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10}; 
+
+		ReplicationFrame expected = new ReplicationFrame();
+		expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length));
+		ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload);
+		expected.setPayload(is);
+		
+		CountDownLatch latch = getCommandLatch();
+		client.oneway(expected);
+		is.close();
+		latch.await(2, TimeUnit.SECONDS);
+		
+		assertEquals(1, serverQueue.size());
+		ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
+		
+		assertEquals(expected.getHeader(), actual.getHeader());		
+		assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload()));
+		
+	}
+
+	
+	/**
+	 * Test out sending a frame that has a PB payload.
+	 * 
+	 * @throws Exception
+	 */
+	public void testPBSlaveInitFrame() throws Exception {
+
+
+		ReplicationFrame expected = new ReplicationFrame();
+		expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+		expected.setPayload(new PBSlaveInit().setNodeId("foo"));
+		
+		CountDownLatch latch = getCommandLatch();
+		client.oneway(expected);
+		latch.await(2, TimeUnit.SECONDS);
+		
+		assertEquals(1, serverQueue.size());
+		ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
+		
+		assertEquals(expected.getHeader(), actual.getHeader());
+		assertEquals(expected.getPayload(), actual.getPayload());
+		
+	}
+
+
+	private void readFully(InputStream ais, byte[] actualPayload) throws IOException {
+		int pos = 0;
+		int c;
+		while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) {
+			pos += c;
+		}
+		if( pos  < actualPayload.length ) {
+			throw new EOFException();
+		}
+	}
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.File;
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+
+/**
+ * Once the wire format is completed we can test against real persistence storage.
+ * 
+ * @version $Revision$
+ */
+public class KahaDBStoreBrokerTest extends BrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    
+    public static Test suite() {
+        return suite(KahaDBStoreBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision$
+ */
+public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    public static Test suite() {
+        return suite(KahaDBStoreRecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    
+    public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        
+        ArrayList<String> expected = new ArrayList<String>();
+        
+        int MESSAGE_COUNT = 10000;
+        for(int i=0; i < MESSAGE_COUNT; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+            expected.add(message.getMessageId().toString());
+        }
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+        producerInfo = createProducerInfo(sessionInfo);
+        connection.send(producerInfo);
+
+        for(int i=0; i < MESSAGE_COUNT/2; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull("Should have received message "+expected.get(0)+" by now!", m);
+            assertEquals(expected.remove(0), m.getMessageId().toString());
+            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+            connection.send(ack);
+        }
+        
+        connection.request(closeConnectionInfo(connectionInfo));
+        
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        for(int i=0; i < MESSAGE_COUNT/2; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull("Should have received message "+expected.get(i)+" by now!", m);
+            assertEquals(expected.get(i), m.getMessageId().toString());
+            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+            connection.send(ack);
+            
+            
+        }
+        
+        connection.request(closeConnectionInfo(connectionInfo));
+    }
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java Fri Nov  7 10:16:10 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.File;
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.XARecoveryBrokerTest;
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision$
+ */
+public class KahaDBStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    public static Test suite() {
+        return suite(KahaDBStoreXARecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+}

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java Fri Nov  7 10:16:10 2008
@@ -41,7 +41,7 @@
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
+import org.apache.kahadb.store.KahaDBStore;
 
 /**
  * This tests bulk loading and unloading of messages to a Queue.s
@@ -56,7 +56,7 @@
 
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
-        KahaDBPersistenceAdaptor kaha = new KahaDBPersistenceAdaptor();
+        KahaDBStore kaha = new KahaDBStore();
         kaha.setDirectory(new File("target/activemq-data/kahadb"));
         // kaha.deleteAllMessages();
         broker.setPersistenceAdapter(kaha);

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java Fri Nov  7 10:16:10 2008
@@ -19,7 +19,7 @@
 import java.io.File;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.perf.SimpleDurableTopicTest;
-import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
+import org.apache.kahadb.store.KahaDBStore;
 
 /**
  * @version $Revision: 1.3 $
@@ -31,7 +31,7 @@
         dataFileDir.mkdirs();
         // answer.setDeleteAllMessagesOnStartup(true);
                
-         KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
+         KahaDBStore adaptor = new KahaDBStore();
          adaptor.setDirectory(dataFileDir);
          
         

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java Fri Nov  7 10:16:10 2008
@@ -19,7 +19,7 @@
 import java.io.File;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.perf.SimpleQueueTest;
-import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
+import org.apache.kahadb.store.KahaDBStore;
 
 /**
  * @version $Revision: 1.3 $
@@ -31,7 +31,7 @@
         dataFileDir.mkdirs();
         answer.setDeleteAllMessagesOnStartup(true);
                
-         KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
+         KahaDBStore adaptor = new KahaDBStore();
          adaptor.setDirectory(dataFileDir);