You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/03 00:48:50 UTC
svn commit: r1405218 -
/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java
Author: chirino
Date: Fri Nov 2 23:48:50 2012
New Revision: 1405218
URL: http://svn.apache.org/viewvc?rev=1405218&view=rev
Log:
Refactor out common logic to keep things dry.
Modified:
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java
Modified: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java?rev=1405218&r1=1405217&r2=1405218&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java (original)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java Fri Nov 2 23:48:50 2012
@@ -43,11 +43,23 @@ import java.util.HashMap;
*/
public class StoreExporter {
+ static final int OPENWIRE_VERSION = 8;
+ static final boolean TIGHT_ENCODING = false;
+
URI config;
File file;
+ private ObjectMapper mapper = new ObjectMapper();
+ private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
+ private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
+ private final AsciiBuffer codec_id = new AsciiBuffer("openwire");
+ private final OpenWireFormat wireformat = new OpenWireFormat();
+
public StoreExporter() throws URISyntaxException {
config = new URI("xbean:activemq.xml");
+ wireformat.setCacheEnabled(false);
+ wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
+ wireformat.setVersion(OPENWIRE_VERSION);
}
public void execute() throws Exception {
@@ -76,19 +88,8 @@ public class StoreExporter {
}
}
- static final int OPENWIRE_VERSION = 8;
- static final boolean TIGHT_ENCODING = false;
-
void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- final AsciiBuffer ds_kind = new AsciiBuffer("ds");
- final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
- final AsciiBuffer codec_id = new AsciiBuffer("openwire");
- final OpenWireFormat wireformat = new OpenWireFormat();
- wireformat.setCacheEnabled(false);
- wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
- wireformat.setVersion(OPENWIRE_VERSION);
final long[] messageKeyCounter = new long[]{0};
final long[] containerKeyCounter = new long[]{0};
@@ -143,31 +144,12 @@ public class StoreExporter {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
- DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
- mos.writeBoolean(TIGHT_ENCODING);
- mos.writeVarInt(OPENWIRE_VERSION);
- wireformat.marshal(message, mos);
-
- MessagePB.Bean messageRecord = new MessagePB.Bean();
- messageRecord.setCodec(codec_id);
- messageRecord.setMessageKey(messageKeyCounter[0]);
- messageRecord.setSize(message.getSize());
- messageRecord.setValue(mos.toBuffer());
- // record.setCompression()
+ MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
manager.store_message(messageRecord);
- QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
- entryRecord.setQueueKey(containerKeyCounter[0]);
- entryRecord.setQueueSeq(seqKeyCounter[0]);
- entryRecord.setMessageKey(messageKeyCounter[0]);
- entryRecord.setSize(message.getSize());
- if (message.getExpiration() != 0) {
- entryRecord.setExpiration(message.getExpiration());
- }
- if (message.getRedeliveryCounter() != 0) {
- entryRecord.setRedeliveries(message.getRedeliveryCounter());
- }
+ QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
manager.store_queue_entry(entryRecord);
+
return true;
}
});
@@ -216,30 +198,10 @@ public class StoreExporter {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
- DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
- mos.writeBoolean(TIGHT_ENCODING);
- mos.writeVarInt(OPENWIRE_VERSION);
- wireformat.marshal(mos);
-
- MessagePB.Bean messageRecord = new MessagePB.Bean();
- messageRecord.setCodec(codec_id);
- messageRecord.setMessageKey(messageKeyCounter[0]);
- messageRecord.setSize(message.getSize());
- messageRecord.setValue(mos.toBuffer());
- // record.setCompression()
+ MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
manager.store_message(messageRecord);
- QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
- entryRecord.setQueueKey(containerKeyCounter[0]);
- entryRecord.setQueueSeq(seqKeyCounter[0]);
- entryRecord.setMessageKey(messageKeyCounter[0]);
- entryRecord.setSize(message.getSize());
- if (message.getExpiration() != 0) {
- entryRecord.setExpiration(message.getExpiration());
- }
- if (message.getRedeliveryCounter() != 0) {
- entryRecord.setRedeliveries(message.getRedeliveryCounter());
- }
+ QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
manager.store_queue_entry(entryRecord);
return true;
}
@@ -251,6 +213,35 @@ public class StoreExporter {
manager.finish();
}
+ private QueueEntryPB.Bean createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) {
+ QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
+ entryRecord.setQueueKey(queueKey);
+ entryRecord.setQueueSeq(queueSeq);
+ entryRecord.setMessageKey(messageKey);
+ entryRecord.setSize(message.getSize());
+ if (message.getExpiration() != 0) {
+ entryRecord.setExpiration(message.getExpiration());
+ }
+ if (message.getRedeliveryCounter() != 0) {
+ entryRecord.setRedeliveries(message.getRedeliveryCounter());
+ }
+ return entryRecord;
+ }
+
+ private MessagePB.Bean createMessagePB(Message message, long messageKey) throws IOException {
+ DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
+ mos.writeBoolean(TIGHT_ENCODING);
+ mos.writeVarInt(OPENWIRE_VERSION);
+ wireformat.marshal(message, mos);
+
+ MessagePB.Bean messageRecord = new MessagePB.Bean();
+ messageRecord.setCodec(codec_id);
+ messageRecord.setMessageKey(messageKey);
+ messageRecord.setSize(message.getSize());
+ messageRecord.setValue(mos.toBuffer());
+ return messageRecord;
+ }
+
public File getFile() {
return file;
}