You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/10 16:03:31 UTC
svn commit: r1481013 - in /activemq/trunk:
activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
Author: chirino
Date: Fri May 10 14:03:31 2013
New Revision: 1481013
URL: http://svn.apache.org/r1481013
Log:
Fixes AMQ-4529: leveldb store NPEs when you send to a composite destination.
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java?rev=1481013&r1=1481012&r2=1481013&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java Fri May 10 14:03:31 2013
@@ -153,7 +153,7 @@ public class MessageId implements DataSt
MessageId copy = new MessageId(producerId, producerSequenceId);
copy.key = key;
copy.brokerSequenceId = brokerSequenceId;
- copy.dataLocator = new AtomicReference<Object>(dataLocator != null ? dataLocator.get() : null);
+ copy.dataLocator = dataLocator;
copy.entryLocator = entryLocator;
copy.plistLocator = plistLocator;
return copy;
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1481013&r1=1481012&r2=1481013&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Fri May 10 14:03:31 2013
@@ -1200,170 +1200,173 @@ class LevelDBClient(store: LevelDBStore)
def store(uows: Array[DelayableUOW]) {
retryUsingIndex {
log.appender { appender =>
+ val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>
+ write_uows(uows, appender, batch)
+ }
+ if( syncNeeded && sync ) {
+ appender.force
+ }
+ } // end of log.appender { block }
- var syncNeeded = false
- index.write(new WriteOptions, max_index_write_latency) { batch =>
-
- var write_message_total = 0L
- var write_enqueue_total = 0L
+ // now that data is logged.. locate message from the data in the logs
+ for( uow <- uows ) {
+ for((msg, action) <- uow.actions ){
+ val messageRecord = action.messageRecord
+ if (messageRecord != null) {
+ messageRecord.id.setDataLocator(messageRecord.locator)
+ }
+ }
+ }
+ }
+ }
- uows.foreach { uow =>
+ def write_uows(uows: Array[DelayableUOW], appender: RecordLog#LogAppender, batch: WriteBatch) = {
+ var syncNeeded = false
+ var write_message_total = 0L
+ var write_enqueue_total = 0L
+
+ for( uow <- uows ) {
+ for( (msg, action) <- uow.actions ) {
+ val messageRecord = action.messageRecord
+ var log_info: LogInfo = null
+ var dataLocator: DataLocator = null
+
+ if (messageRecord != null && messageRecord.locator == null) {
+ val start = System.nanoTime()
+ val p = appender.append(LOG_DATA, messageRecord.data)
+ log_info = p._2
+ dataLocator = DataLocator(p._1, messageRecord.data.length)
+ messageRecord.locator = dataLocator
+// println("msg: "+messageRecord.id+" -> "+dataLocator)
+ write_message_total += System.nanoTime() - start
+ }
- uow.actions.foreach { case (msg, action) =>
- val messageRecord = action.messageRecord
- var log_info:LogInfo = null
- var pos = -1L
- var dataLocator:DataLocator = null
-
- if (messageRecord != null && messageRecord.locator==null) {
- val start = System.nanoTime()
- val p = appender.append(LOG_DATA, messageRecord.data)
- pos = p._1
- log_info = p._2
- dataLocator = DataLocator(pos, messageRecord.data.length)
- messageRecord.locator = dataLocator
- write_message_total += System.nanoTime() - start
- }
-
-
- action.dequeues.foreach { entry =>
- val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
- val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
-
- if( dataLocator==null ) {
- dataLocator = entry.id.getDataLocator match {
- case x:DataLocator => x
- case x:MessageRecord => x.locator
- case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
- }
- }
-
- val log_record = new EntryRecord.Bean()
- log_record.setCollectionKey(entry.queueKey)
- log_record.setEntryKey(new Buffer(key, 9, 8))
- log_record.setValueLocation(dataLocator.pos)
- appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
-
- batch.delete(key)
- logRefDecrement(dataLocator.pos)
- collectionDecrementSize(entry.queueKey)
- }
-
- action.enqueues.foreach { entry =>
-
- if(dataLocator ==null ) {
- dataLocator = entry.id.getDataLocator match {
- case x:DataLocator => x
- case x:MessageRecord => x.locator
- case _ =>
- throw new RuntimeException("Unexpected locator type")
- }
- }
-
- val start = System.nanoTime()
-
- val key = encodeEntryKey(ENTRY_PREFIX, entry.queueKey, entry.queueSeq)
-
- assert(entry.id.getDataLocator()!=null)
-
- val log_record = new EntryRecord.Bean()
- log_record.setCollectionKey(entry.queueKey)
- log_record.setEntryKey(new Buffer(key, 9, 8))
- log_record.setValueLocation(dataLocator.pos)
- log_record.setValueLength(dataLocator.len)
- appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
-
- val index_record = new EntryRecord.Bean()
- index_record.setValueLocation(dataLocator.pos)
- index_record.setValueLength(dataLocator.len)
- batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
-
- val log_data = encodeEntryRecord(log_record.freeze())
- val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
-
- appender.append(LOG_ADD_ENTRY, log_data)
- batch.put(key, index_data)
-
- for( key <- logRefKey(pos, log_info) ) {
- logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
- }
-
- collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
- write_enqueue_total += System.nanoTime() - start
- }
-
- action.xaAcks.foreach { entry:XaAckRecord =>
- val ack = entry.ack
- if( dataLocator==null ) {
- dataLocator = ack.getLastMessageId.getDataLocator match {
- case x:DataLocator => x
- case x:MessageRecord => x.locator
- case _ =>
- throw new RuntimeException("Unexpected locator type")
- }
- }
- println(dataLocator)
-
- val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
- val os = new DataByteArrayOutputStream()
- os.writeLong(dataLocator.pos)
- os.writeInt(dataLocator.len)
- os.writeLong(el.qid)
- os.writeLong(el.seq)
- os.writeLong(entry.sub)
- store.wireFormat.marshal(ack, os)
- var ack_encoded = os.toBuffer
-
- val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
- val log_record = new EntryRecord.Bean()
- log_record.setCollectionKey(entry.container)
- log_record.setEntryKey(new Buffer(key, 9, 8))
- log_record.setMeta(ack_encoded)
- appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
- val index_record = new EntryRecord.Bean()
- index_record.setMeta(ack_encoded)
- batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
- }
+ for( entry <- action.dequeues) {
+ val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
+ val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
+
+ if (dataLocator == null) {
+ dataLocator = entry.id.getDataLocator match {
+ case x: DataLocator => x
+ case x: MessageRecord => x.locator
+ case _ => throw new RuntimeException("Unexpected locator type: " + dataLocator)
}
+ }
- uow.subAcks.foreach { entry =>
- val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
- val log_record = new EntryRecord.Bean()
- log_record.setCollectionKey(entry.subKey)
- log_record.setEntryKey(ACK_POSITION)
- log_record.setValueLocation(entry.ackPosition)
- appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
-
- val index_record = new EntryRecord.Bean()
- index_record.setValueLocation(entry.ackPosition)
- batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
- }
+// println("deq: "+entry.id+" -> "+dataLocator)
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.queueKey)
+ log_record.setEntryKey(new Buffer(key, 9, 8))
+ log_record.setValueLocation(dataLocator.pos)
+ appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+ batch.delete(key)
+ logRefDecrement(dataLocator.pos)
+ collectionDecrementSize(entry.queueKey)
+ }
+
+ for( entry<- action.enqueues) {
- if( !syncNeeded && uow.syncNeeded ) {
- syncNeeded = true
+ if (dataLocator == null) {
+ dataLocator = entry.id.getDataLocator match {
+ case x: DataLocator => x
+ case x: MessageRecord => x.locator
+ case _ =>
+ throw new RuntimeException("Unexpected locator type")
}
}
- max_write_message_latency.add(write_message_total)
- max_write_enqueue_latency.add(write_enqueue_total)
- }
- if( syncNeeded && sync ) {
- appender.force
+// println("enq: "+entry.id+" -> "+dataLocator)
+ val start = System.nanoTime()
+
+ val key = encodeEntryKey(ENTRY_PREFIX, entry.queueKey, entry.queueSeq)
+
+ assert(entry.id.getDataLocator() != null)
+
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.queueKey)
+ log_record.setEntryKey(new Buffer(key, 9, 8))
+ log_record.setValueLocation(dataLocator.pos)
+ log_record.setValueLength(dataLocator.len)
+ appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+ val index_record = new EntryRecord.Bean()
+ index_record.setValueLocation(dataLocator.pos)
+ index_record.setValueLength(dataLocator.len)
+ batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
+
+ val log_data = encodeEntryRecord(log_record.freeze())
+ val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
+
+ appender.append(LOG_ADD_ENTRY, log_data)
+ batch.put(key, index_data)
+
+ for (key <- logRefKey(dataLocator.pos, log_info)) {
+ logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
+ }
+
+ collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
+ write_enqueue_total += System.nanoTime() - start
}
- } // end of log.appender { block }
- // now that data is logged.. locate message from the data in the logs
- uows.foreach { uow =>
- uow.actions.foreach { case (msg, action) =>
- val messageRecord = action.messageRecord
- if (messageRecord != null) {
- messageRecord.id.setDataLocator(messageRecord.locator)
+ for( entry <- action.xaAcks ) {
+
+ val ack = entry.ack
+ if (dataLocator == null) {
+ dataLocator = ack.getLastMessageId.getDataLocator match {
+ case x: DataLocator => x
+ case x: MessageRecord => x.locator
+ case _ =>
+ throw new RuntimeException("Unexpected locator type")
+ }
}
+ println(dataLocator)
+
+ val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
+ val os = new DataByteArrayOutputStream()
+ os.writeLong(dataLocator.pos)
+ os.writeInt(dataLocator.len)
+ os.writeLong(el.qid)
+ os.writeLong(el.seq)
+ os.writeLong(entry.sub)
+ store.wireFormat.marshal(ack, os)
+ var ack_encoded = os.toBuffer
+
+ val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.container)
+ log_record.setEntryKey(new Buffer(key, 9, 8))
+ log_record.setMeta(ack_encoded)
+ appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+ val index_record = new EntryRecord.Bean()
+ index_record.setMeta(ack_encoded)
+ batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
}
+
+ for( entry <- uow.subAcks ) {
+ val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.subKey)
+ log_record.setEntryKey(ACK_POSITION)
+ log_record.setValueLocation(entry.ackPosition)
+ appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+ val index_record = new EntryRecord.Bean()
+ index_record.setValueLocation(entry.ackPosition)
+ batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
+ }
+
+ if (uow.syncNeeded) {
+ syncNeeded = true
+ }
}
+
+ max_write_message_latency.add(write_message_total)
+ max_write_enqueue_latency.add(write_enqueue_total)
+ syncNeeded
}
def getCollectionEntries(collectionKey: Long, firstSeq:Long, lastSeq:Long): Seq[(Buffer, EntryRecord.Buffer)] = {