You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/10/07 22:19:52 UTC
svn commit: r702615 -
/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Author: chirino
Date: Tue Oct 7 13:19:51 2008
New Revision: 702615
URL: http://svn.apache.org/viewvc?rev=702615&view=rev
Log:
updating to use new PB interfaces.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=702615&r1=702614&r2=702615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Tue Oct 7 13:19:51 2008
@@ -404,11 +404,11 @@
* durring a recovery process.
*/
public Location store(JournalCommand data, boolean sync) throws IOException {
- int size = data.serializedSize();
+ int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
- data.writeTo(os);
- Location location = asyncDataManager.write(os.getByteSequence(), sync);
+ data.writeFramed(os);
+ Location location = asyncDataManager.write(os.toByteSequence(), sync);
process(data, location);
if( !recovering ) {
metadata.lastUpdate = location;
@@ -428,7 +428,7 @@
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
KahaEntryType type = KahaEntryType.valueOf(is.readByte());
JournalCommand message = (JournalCommand)type.createMessage();
- message.mergeFrom(is);
+ message.mergeFramed(is);
return message;
}
@@ -867,16 +867,12 @@
public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
- byte[] ba = new byte[dataIn.readShort()];
- dataIn.readFully(ba);
- rc.mergeFrom(ba);
+ rc.mergeFramed((InputStream)dataIn);
return rc;
}
public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
- byte[] ba = object.toByteArray();
- dataOut.writeShort(ba.length);
- dataOut.write(ba);
+ object.writeFramed((OutputStream)dataOut);
}
}