You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/07 19:16:45 UTC
svn commit: r712217 [2/2] - in /activemq/sandbox/kahadb: ./
src/main/java/org/apache/kahadb/ha/
src/main/java/org/apache/kahadb/ha/command/
src/main/java/org/apache/kahadb/journal/
src/main/java/org/apache/kahadb/page/ src/main/java/org/apache/kahadb/r...
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Fri Nov 7 10:16:10 2008
@@ -28,9 +28,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,20 +38,19 @@
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.util.Callback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.LongMarshaller;
import org.apache.kahadb.Marshaller;
import org.apache.kahadb.StringMarshaller;
import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.index.BTreeNode;
import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.page.Transaction.PageOverflowIOException;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
import org.apache.kahadb.store.data.KahaDestination;
@@ -70,8 +67,6 @@
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
public class MessageDatabase {
@@ -140,7 +135,7 @@
}
protected PageFile pageFile;
- protected Journal asyncDataManager;
+ protected Journal journal;
protected Metadata metadata = new Metadata();
protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -175,15 +170,15 @@
public void load() throws IOException {
recovering=true;
- if (asyncDataManager == null) {
- asyncDataManager = createAsyncDataManager();
- }
+
+ // Creates the journal if it does not yet exist.
+ getJournal();
if (failIfJournalIsLocked) {
- asyncDataManager.lock();
+ journal.lock();
} else {
while (true) {
try {
- asyncDataManager.lock();
+ journal.lock();
break;
} catch (IOException e) {
LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
@@ -194,16 +189,14 @@
}
}
}
- if (pageFile == null) {
- pageFile = createPageFile();
- }
+
+ // Creates the page file if it does not yet exist.
+ getPageFile();
- asyncDataManager.start();
+ journal.start();
if (deleteAllMessages) {
pageFile.delete();
- asyncDataManager.delete();
-
- store(new KahaTraceCommand().setMessage("DELETED " + new Date()));
+ journal.delete();
LOG.info("Persistence store purged.");
deleteAllMessages = false;
@@ -269,11 +262,11 @@
Thread.sleep(sleepTime);
long now = System.currentTimeMillis();
if( now - lastCleanup >= cleanupInterval ) {
- checkpoint(true);
+ checkpointCleanup(true);
lastCleanup = now;
lastCheckpoint = now;
} else if( now - lastCheckpoint >= checkpointInterval ) {
- checkpoint(false);
+ checkpointCleanup(false);
lastCheckpoint = now;
}
}
@@ -305,7 +298,7 @@
metadata = new Metadata();
}
store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new Date()));
- asyncDataManager.close();
+ journal.close();
}
/**
@@ -347,7 +340,7 @@
// Perhaps there were no transactions...
if( pos==null && metadata.lastUpdate!=null) {
// Start replay at the record after the last one recorded in the index file.
- pos = asyncDataManager.getNextLocation(metadata.lastUpdate);
+ pos = journal.getNextLocation(metadata.lastUpdate);
// No journal records need to be recovered.
if( pos == null ) {
return;
@@ -357,17 +350,17 @@
// Do we need to start from the begining?
if (pos == null) {
// This loads the first position.
- pos = asyncDataManager.getNextLocation(null);
+ pos = journal.getNextLocation(null);
}
int redoCounter = 0;
- LOG.info("Journal Recovery Started from: " + asyncDataManager + " at " + pos.getDataFileId() + ":" + pos.getOffset());
+ LOG.info("Journal Recovery Started from: " + journal + " at " + pos.getDataFileId() + ":" + pos.getOffset());
while (pos != null) {
JournalCommand message = load(pos);
process(message, pos);
redoCounter++;
- pos = asyncDataManager.getNextLocation(pos);
+ pos = journal.getNextLocation(pos);
}
Location location = store(new KahaTraceCommand().setMessage("RECOVERED " + new Date()), true);
@@ -375,7 +368,7 @@
LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
}
- private void checkpoint(final boolean cleanup) {
+ protected void checkpointCleanup(final boolean cleanup) {
try {
synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -390,6 +383,23 @@
}
}
+
+ public void checkpoint(Callback closure) throws Exception {
+ try {
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, false);
+ }
+ });
+ pageFile.flush();
+ closure.execute();
+ }
+ store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
+ } catch (IOException e) {
+ }
+ }
+
// /////////////////////////////////////////////////////////////////
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
@@ -408,7 +418,7 @@
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
data.writeFramed(os);
- Location location = asyncDataManager.write(os.toByteSequence(), sync);
+ Location location = journal.write(os.toByteSequence(), sync);
process(data, location);
if( !recovering ) {
metadata.lastUpdate = location;
@@ -424,7 +434,7 @@
* @throws IOException
*/
public JournalCommand load(Location location) throws IOException {
- ByteSequence data = asyncDataManager.read(location);
+ ByteSequence data = journal.read(location);
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
KahaEntryType type = KahaEntryType.valueOf(is.readByte());
JournalCommand message = (JournalCommand)type.createMessage();
@@ -742,7 +752,7 @@
}
LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
- asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
+ journal.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
}
LOG.debug("Checkpoint done.");
@@ -1110,7 +1120,7 @@
return pf;
}
- private Journal createAsyncDataManager() {
+ private Journal createJournal() {
Journal manager = new Journal();
manager.setDirectory(new File(directory, "journal"));
manager.setMaxFileLength(1024 * 1024 * 20);
@@ -1158,4 +1168,17 @@
this.cleanupInterval = cleanupInterval;
}
+ public PageFile getPageFile() {
+ if (pageFile == null) {
+ pageFile = createPageFile();
+ }
+ return pageFile;
+ }
+
+ public Journal getJournal() {
+ if (journal == null) {
+ journal = createJournal();
+ }
+ return journal;
+ }
}
Added: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (added)
+++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Fri Nov 7 10:16:10 2008
@@ -0,0 +1,99 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.kahadb.replication.pb;
+
+option java_multiple_files = true;
+option java_outer_classname = "PB";
+
+//
+//
+//
+message PBHeader {
+ required PBType type=1;
+ optional int64 payload_size=2;
+}
+
+enum PBType {
+
+ // Sent from the slave to the master when the slave first starts. It lets the master
+ // know about the slave's synchronization state. This allows the master decide how to best synchronize
+ // the slave.
+ //
+ // @followed-by PBSlaveInit
+ SLAVE_INIT = 0;
+
+ // The Master will send this response back to the slave, letting it know what it needs to do to get
+ // it's data files synchronized with the master.
+ //
+ // @followed-by PBSlaveInitResponse
+ SLAVE_INIT_RESPONSE = 1;
+
+ // Sent from the Master to the slave to replicate a Journal update.
+ //
+ // @followed-by PBJournalUpdate
+ JOURNAL_UPDATE=3;
+
+ // An ack sent from the Slave to a master to let the master know up to where in the journal the slave has
+ // synchronized to. This acknowledges receipt of all previous journal records. This should not be sent until
+ // all bulk file copies are complete.
+ //
+ // @followed-by PBJournalLocation
+ JOURNAL_UPDATE_ACK=4;
+
+ // A Request for a bulk file transfer. Sent from a slave to a Master
+ //
+ // @followed-by PBFileInfo
+ FILE_TRANSFER=5;
+
+ // A bulk file transfer response
+ //
+ // @followed-by the bytes of the requested file.
+ FILE_TRANSFER_RESPONSE=6;
+}
+
+message PBFileInfo {
+ required string name=1;
+ optional int32 snapshot_id=2;
+ optional sfixed64 checksum=3;
+ optional int64 start=4;
+ optional int64 end=5;
+}
+
+message PBJournalLocation {
+ required int32 file_id=1;
+ required int32 offset=2;
+}
+
+message PBSlaveInit {
+ // The id of the slave node that is being initialized
+ required string node_id=1;
+ // The files that the slave node currently has
+ repeated PBFileInfo current_files=2;
+}
+
+message PBSlaveInitResponse {
+ // The files that the slave should bulk copy from the master..
+ repeated PBFileInfo copy_files=1;
+ // The files that the slave should delete
+ repeated string delete_files=2;
+}
+
+message PBJournalUpdate {
+ required PBJournalLocation location=1;
+ required bytes data=2;
+}
+
Added: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr (added)
+++ activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr Fri Nov 7 10:16:10 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRTransportFactory
Added: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr (added)
+++ activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr Fri Nov 7 10:16:10 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRWireFormatFactory
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Fri Nov 7 10:16:10 2008
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ReplicationTest extends TestCase {
+
+
+ private static final String BROKER1_URI = "tcp://localhost:61001";
+ private static final String BROKER2_URI = "tcp://localhost:61002";
+
+ private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
+ private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
+
+ private Destination destination = new ActiveMQQueue("TEST_QUEUE");
+
+ public void testReplication() throws Exception {
+
+ // This cluster object will control who becomes the master.
+ StaticClusterStateManager cluster = new StaticClusterStateManager();
+
+ ReplicatedBrokerService b1 = new ReplicatedBrokerService();
+ b1.addConnector(BROKER1_URI);
+ b1.setDataDirectory("target/replication-test/broker1");
+ b1.setBrokerName("broker1");
+ b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
+ b1.getReplicationServer().setCluster(cluster);
+ b1.start();
+
+ ReplicatedBrokerService b2 = new ReplicatedBrokerService();
+ b2.addConnector(BROKER2_URI);
+ b2.setDataDirectory("target/replication-test/broker2");
+ b2.setBrokerName("broker2");
+ b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
+ b2.getReplicationServer().setCluster(cluster);
+ b2.start();
+
+// // None of the brokers should be accepting connections since they are not masters.
+// try {
+// sendMesagesTo(1, BROKER1_URI);
+// fail("Connection failure expected.");
+// } catch( JMSException e ) {
+// }
+
+ // Make b1 the master.
+ ClusterState clusterState = new ClusterState();
+ clusterState.setMaster(BROKER1_REPLICATION_ID);
+ cluster.setClusterState(clusterState);
+
+ try {
+ sendMesagesTo(500, BROKER1_URI);
+ } catch( JMSException e ) {
+ fail("b1 did not become a master.");
+ }
+
+ // Make broker 2 a salve.
+ clusterState = new ClusterState();
+ clusterState.setMaster(BROKER1_REPLICATION_ID);
+ String[] slaves = {BROKER2_REPLICATION_ID};
+ clusterState.setSlaves(Arrays.asList(slaves));
+ cluster.setClusterState(clusterState);
+
+ Thread.sleep(10000);
+
+ b2.stop();
+ b1.stop();
+
+ }
+
+ private void sendMesagesTo(int count, String brokerUri) throws JMSException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+ Connection con = cf.createConnection();
+ try {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < count; i++) {
+ producer.send(session.createTextMessage("Hello: "+i));
+ }
+ } finally {
+ try { con.close(); } catch (Throwable e) {}
+ }
+ }
+
+}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java Fri Nov 7 10:16:10 2008
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.kahadb.replication.ReplicationFrame;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBType;
+
+public class KDBRTransportTest extends TestCase {
+
+ private static final String KDBR_URI = "kdbr://localhost:61618";
+ private List<Object> serverQueue;
+ private List<Object> clientQueue;
+ private List<Transport> serverTransports;
+ private TransportServer server;
+ private Transport client;
+
+ private Object commandLatchMutex = new Object();
+ private CountDownLatch commandLatch;
+
+ protected void releaseCommandLatch() {
+ synchronized( commandLatchMutex ) {
+ if( commandLatch == null ) {
+ return;
+ }
+ commandLatch.countDown();
+ commandLatch=null;
+ }
+ }
+
+ protected CountDownLatch getCommandLatch() {
+ synchronized( commandLatchMutex ) {
+ if( commandLatch == null ) {
+ commandLatch = new CountDownLatch(1);
+ }
+ return commandLatch;
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ serverQueue = Collections.synchronizedList(new ArrayList<Object>());
+ clientQueue = Collections.synchronizedList(new ArrayList<Object>());
+ serverTransports = Collections.synchronizedList(new ArrayList<Transport>());
+
+ // Setup a server
+ server = TransportFactory.bind(new URI(KDBR_URI));
+ server.setAcceptListener(new TransportAcceptListener() {
+ public void onAccept(Transport transport) {
+ try {
+ transport.setTransportListener(new TransportListener() {
+ public void onCommand(Object command) {
+ try {
+ serverQueue.add(command);
+ process(command);
+ releaseCommandLatch();
+ } catch (IOException e) {
+ onException(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ serverQueue.add(error);
+ serverTransports.remove(this);
+ releaseCommandLatch();
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+ });
+ transport.start();
+ serverTransports.add(transport);
+ } catch (Exception e) {
+ onAcceptError(e);
+ }
+ }
+
+ public void onAcceptError(Exception error) {
+ error.printStackTrace();
+ }
+ });
+ server.start();
+
+ // Connect a client.
+ client = TransportFactory.connect(new URI(KDBR_URI));
+ client.setTransportListener(new TransportListener() {
+ public void onCommand(Object command) {
+ clientQueue.add(command);
+ releaseCommandLatch();
+ }
+
+ public void onException(IOException error) {
+ clientQueue.add(error);
+ releaseCommandLatch();
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+ });
+ client.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ client.stop();
+ server.stop();
+ }
+
+ private void process(Object command) throws IOException {
+ ReplicationFrame frame = (ReplicationFrame) command;
+ // Since we are processing the commands async in this test case we need to full read the stream before
+ // returning since will be be used to read the next command once we return.
+ if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) {
+ InputStream ais = (InputStream) frame.getPayload();
+ byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()];
+ readFully(ais, actualPayload);
+ frame.setPayload(actualPayload);
+ }
+ }
+
+ /**
+ * Test a frame that has a streaming payload.
+ *
+ * @throws Exception
+ */
+ public void testFileTransferResponse() throws Exception {
+
+ byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10};
+
+ ReplicationFrame expected = new ReplicationFrame();
+ expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length));
+ ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload);
+ expected.setPayload(is);
+
+ CountDownLatch latch = getCommandLatch();
+ client.oneway(expected);
+ is.close();
+ latch.await(2, TimeUnit.SECONDS);
+
+ assertEquals(1, serverQueue.size());
+ ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
+
+ assertEquals(expected.getHeader(), actual.getHeader());
+ assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload()));
+
+ }
+
+
+ /**
+ * Test out sending a frame that has a PB payload.
+ *
+ * @throws Exception
+ */
+ public void testPBSlaveInitFrame() throws Exception {
+
+
+ ReplicationFrame expected = new ReplicationFrame();
+ expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+ expected.setPayload(new PBSlaveInit().setNodeId("foo"));
+
+ CountDownLatch latch = getCommandLatch();
+ client.oneway(expected);
+ latch.await(2, TimeUnit.SECONDS);
+
+ assertEquals(1, serverQueue.size());
+ ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0);
+
+ assertEquals(expected.getHeader(), actual.getHeader());
+ assertEquals(expected.getPayload(), actual.getPayload());
+
+ }
+
+
+ private void readFully(InputStream ais, byte[] actualPayload) throws IOException {
+ int pos = 0;
+ int c;
+ while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) {
+ pos += c;
+ }
+ if( pos < actualPayload.length ) {
+ throw new EOFException();
+ }
+ }
+}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java Fri Nov 7 10:16:10 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.File;
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+
+/**
+ * Once the wire format is completed we can test against real persistence storage.
+ *
+ * @version $Revision$
+ */
+public class KahaDBStoreBrokerTest extends BrokerTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ kaha.deleteAllMessages();
+ broker.setPersistenceAdapter(kaha);
+ return broker;
+ }
+
+ protected BrokerService createRestartedBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ broker.setPersistenceAdapter(kaha);
+ return broker;
+ }
+
+
+ public static Test suite() {
+ return suite(KahaDBStoreBrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java Fri Nov 7 10:16:10 2008
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+
+/**
+ * Used to verify that recovery works correctly against
+ *
+ * @version $Revision$
+ */
+public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ kaha.deleteAllMessages();
+ broker.setPersistenceAdapter(kaha);
+ return broker;
+ }
+
+ protected BrokerService createRestartedBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ broker.setPersistenceAdapter(kaha);
+ return broker;
+ }
+
+ public static Test suite() {
+ return suite(KahaDBStoreRecoveryBrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+
+ public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ ArrayList<String> expected = new ArrayList<String>();
+
+ int MESSAGE_COUNT = 10000;
+ for(int i=0; i < MESSAGE_COUNT; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ expected.add(message.getMessageId().toString());
+ }
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // restart the broker.
+ restartBroker();
+
+ // Setup the consumer and receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+ producerInfo = createProducerInfo(sessionInfo);
+ connection.send(producerInfo);
+
+ for(int i=0; i < MESSAGE_COUNT/2; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull("Should have received message "+expected.get(0)+" by now!", m);
+ assertEquals(expected.remove(0), m.getMessageId().toString());
+ MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+ connection.send(ack);
+ }
+
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // restart the broker.
+ restartBroker();
+
+ // Setup the consumer and receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ for(int i=0; i < MESSAGE_COUNT/2; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull("Should have received message "+expected.get(i)+" by now!", m);
+ assertEquals(expected.get(i), m.getMessageId().toString());
+ MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+ connection.send(ack);
+
+
+ }
+
+ connection.request(closeConnectionInfo(connectionInfo));
+ }
+}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java?rev=712217&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java Fri Nov 7 10:16:10 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.File;
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.XARecoveryBrokerTest;
+
+/**
+ * Used to verify that recovery works correctly against
+ *
+ * @version $Revision$
+ */
+public class KahaDBStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+ public static Test suite() {
+ return suite(KahaDBStoreXARecoveryBrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ kaha.deleteAllMessages();
+ broker.setPersistenceAdapter(kaha);
+ return broker;
+ }
+
+ protected BrokerService createRestartedBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ broker.setPersistenceAdapter(kaha);
+ return broker;
+ }
+
+}
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java Fri Nov 7 10:16:10 2008
@@ -41,7 +41,7 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
+import org.apache.kahadb.store.KahaDBStore;
/**
* This tests bulk loading and unloading of messages to a Queue.s
@@ -56,7 +56,7 @@
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
- KahaDBPersistenceAdaptor kaha = new KahaDBPersistenceAdaptor();
+ KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File("target/activemq-data/kahadb"));
// kaha.deleteAllMessages();
broker.setPersistenceAdapter(kaha);
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java Fri Nov 7 10:16:10 2008
@@ -19,7 +19,7 @@
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.SimpleDurableTopicTest;
-import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
+import org.apache.kahadb.store.KahaDBStore;
/**
* @version $Revision: 1.3 $
@@ -31,7 +31,7 @@
dataFileDir.mkdirs();
// answer.setDeleteAllMessagesOnStartup(true);
- KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
+ KahaDBStore adaptor = new KahaDBStore();
adaptor.setDirectory(dataFileDir);
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java?rev=712217&r1=712216&r2=712217&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java Fri Nov 7 10:16:10 2008
@@ -19,7 +19,7 @@
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.SimpleQueueTest;
-import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
+import org.apache.kahadb.store.KahaDBStore;
/**
* @version $Revision: 1.3 $
@@ -31,7 +31,7 @@
dataFileDir.mkdirs();
answer.setDeleteAllMessagesOnStartup(true);
- KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
+ KahaDBStore adaptor = new KahaDBStore();
adaptor.setDirectory(dataFileDir);