You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/10 16:03:31 UTC

svn commit: r1481013 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/command/MessageId.java activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala

Author: chirino
Date: Fri May 10 14:03:31 2013
New Revision: 1481013

URL: http://svn.apache.org/r1481013
Log:
Fixes AMQ-4529: leveldb store NPEs when you send to a composite destination.

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java?rev=1481013&r1=1481012&r2=1481013&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java Fri May 10 14:03:31 2013
@@ -153,7 +153,7 @@ public class MessageId implements DataSt
         MessageId copy = new MessageId(producerId, producerSequenceId);
         copy.key = key;
         copy.brokerSequenceId = brokerSequenceId;
-        copy.dataLocator = new AtomicReference<Object>(dataLocator != null ? dataLocator.get() : null);
+        copy.dataLocator = dataLocator;
         copy.entryLocator = entryLocator;
         copy.plistLocator = plistLocator;
         return copy;

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1481013&r1=1481012&r2=1481013&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Fri May 10 14:03:31 2013
@@ -1200,170 +1200,173 @@ class LevelDBClient(store: LevelDBStore)
   def store(uows: Array[DelayableUOW]) {
     retryUsingIndex {
       log.appender { appender =>
+        val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>
+          write_uows(uows, appender, batch)
+        }
+        if( syncNeeded && sync ) {
+          appender.force
+        }
+      } // end of log.appender { block }
 
-        var syncNeeded = false
-        index.write(new WriteOptions, max_index_write_latency) { batch =>
-
-          var write_message_total = 0L
-          var write_enqueue_total = 0L
+      // now that data is logged.. locate message from the data in the logs
+      for( uow <- uows ) {
+        for((msg, action) <- uow.actions ){
+          val messageRecord = action.messageRecord
+          if (messageRecord != null) {
+            messageRecord.id.setDataLocator(messageRecord.locator)
+          }
+        }
+      }
+    }
+  }
 
-          uows.foreach { uow =>
 
+  def write_uows(uows: Array[DelayableUOW], appender: RecordLog#LogAppender, batch: WriteBatch) = {
+    var syncNeeded = false
+    var write_message_total = 0L
+    var write_enqueue_total = 0L
+
+    for( uow <- uows ) {
+      for( (msg, action) <- uow.actions ) {
+        val messageRecord = action.messageRecord
+        var log_info: LogInfo = null
+        var dataLocator: DataLocator = null
+
+        if (messageRecord != null && messageRecord.locator == null) {
+          val start = System.nanoTime()
+          val p = appender.append(LOG_DATA, messageRecord.data)
+          log_info = p._2
+          dataLocator = DataLocator(p._1, messageRecord.data.length)
+          messageRecord.locator = dataLocator
+//          println("msg: "+messageRecord.id+" -> "+dataLocator)
+          write_message_total += System.nanoTime() - start
+        }
 
-            uow.actions.foreach { case (msg, action) =>
-              val messageRecord = action.messageRecord
-              var log_info:LogInfo = null
-              var pos = -1L
-              var dataLocator:DataLocator = null
-
-              if (messageRecord != null && messageRecord.locator==null) {
-                val start = System.nanoTime()
-                val p = appender.append(LOG_DATA, messageRecord.data)
-                pos = p._1
-                log_info = p._2
-                dataLocator = DataLocator(pos, messageRecord.data.length)
-                messageRecord.locator = dataLocator
-                write_message_total += System.nanoTime() - start
-              }
-
-
-              action.dequeues.foreach { entry =>
-                val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
-                val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
-
-                if( dataLocator==null ) {
-                  dataLocator = entry.id.getDataLocator match {
-                    case x:DataLocator => x
-                    case x:MessageRecord => x.locator
-                    case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
-                  }
-                }
-
-                val log_record = new EntryRecord.Bean()
-                log_record.setCollectionKey(entry.queueKey)
-                log_record.setEntryKey(new Buffer(key, 9, 8))
-                log_record.setValueLocation(dataLocator.pos)
-                appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
-
-                batch.delete(key)
-                logRefDecrement(dataLocator.pos)
-                collectionDecrementSize(entry.queueKey)
-              }
-
-              action.enqueues.foreach { entry =>
-                
-                if(dataLocator ==null ) {
-                  dataLocator = entry.id.getDataLocator match {
-                    case x:DataLocator => x
-                    case x:MessageRecord => x.locator
-                    case _ =>
-                      throw new RuntimeException("Unexpected locator type")
-                  }
-                }
-
-                val start = System.nanoTime()
-
-                val key = encodeEntryKey(ENTRY_PREFIX, entry.queueKey, entry.queueSeq)
-
-                assert(entry.id.getDataLocator()!=null)
-
-                val log_record = new EntryRecord.Bean()
-                log_record.setCollectionKey(entry.queueKey)
-                log_record.setEntryKey(new Buffer(key, 9, 8))
-                log_record.setValueLocation(dataLocator.pos)
-                log_record.setValueLength(dataLocator.len)
-                appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
-
-                val index_record = new EntryRecord.Bean()
-                index_record.setValueLocation(dataLocator.pos)
-                index_record.setValueLength(dataLocator.len)
-                batch.put(key,  encodeEntryRecord(index_record.freeze()).toByteArray)
-
-                val log_data = encodeEntryRecord(log_record.freeze())
-                val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
-
-                appender.append(LOG_ADD_ENTRY, log_data)
-                batch.put(key, index_data)
-
-                for( key <- logRefKey(pos, log_info) ) {
-                  logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
-                }
-
-                collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
-                write_enqueue_total += System.nanoTime() - start
-              }
-
-              action.xaAcks.foreach { entry:XaAckRecord =>
-                val ack = entry.ack
-                if( dataLocator==null ) {
-                  dataLocator = ack.getLastMessageId.getDataLocator match {
-                    case x:DataLocator => x
-                    case x:MessageRecord => x.locator
-                    case _ =>
-                      throw new RuntimeException("Unexpected locator type")
-                  }
-                }
-                println(dataLocator)
-
-                val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
-                val os = new DataByteArrayOutputStream()
-                os.writeLong(dataLocator.pos)
-                os.writeInt(dataLocator.len)
-                os.writeLong(el.qid)
-                os.writeLong(el.seq)
-                os.writeLong(entry.sub)
-                store.wireFormat.marshal(ack, os)
-                var ack_encoded = os.toBuffer
-
-                val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
-                val log_record = new EntryRecord.Bean()
-                log_record.setCollectionKey(entry.container)
-                log_record.setEntryKey(new Buffer(key, 9, 8))
-                log_record.setMeta(ack_encoded)
-                appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
-                val index_record = new EntryRecord.Bean()
-                index_record.setMeta(ack_encoded)
-                batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
-              }
 
+        for( entry <- action.dequeues) {
+          val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
+          val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
+
+          if (dataLocator == null) {
+            dataLocator = entry.id.getDataLocator match {
+              case x: DataLocator => x
+              case x: MessageRecord => x.locator
+              case _ => throw new RuntimeException("Unexpected locator type: " + dataLocator)
             }
+          }
 
-            uow.subAcks.foreach { entry =>
-              val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
-              val log_record = new EntryRecord.Bean()
-              log_record.setCollectionKey(entry.subKey)
-              log_record.setEntryKey(ACK_POSITION)
-              log_record.setValueLocation(entry.ackPosition)
-              appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
-
-              val index_record = new EntryRecord.Bean()
-              index_record.setValueLocation(entry.ackPosition)
-              batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
-            }
+//          println("deq: "+entry.id+" -> "+dataLocator)
+          val log_record = new EntryRecord.Bean()
+          log_record.setCollectionKey(entry.queueKey)
+          log_record.setEntryKey(new Buffer(key, 9, 8))
+          log_record.setValueLocation(dataLocator.pos)
+          appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+          batch.delete(key)
+          logRefDecrement(dataLocator.pos)
+          collectionDecrementSize(entry.queueKey)
+        }
+
+        for( entry<- action.enqueues) {
 
-            if( !syncNeeded && uow.syncNeeded ) {
-              syncNeeded = true
+          if (dataLocator == null) {
+            dataLocator = entry.id.getDataLocator match {
+              case x: DataLocator => x
+              case x: MessageRecord => x.locator
+              case _ =>
+                throw new RuntimeException("Unexpected locator type")
             }
           }
 
-          max_write_message_latency.add(write_message_total)
-          max_write_enqueue_latency.add(write_enqueue_total)
-        }
-        if( syncNeeded && sync ) {
-          appender.force
+//          println("enq: "+entry.id+" -> "+dataLocator)
+          val start = System.nanoTime()
+
+          val key = encodeEntryKey(ENTRY_PREFIX, entry.queueKey, entry.queueSeq)
+
+          assert(entry.id.getDataLocator() != null)
+
+          val log_record = new EntryRecord.Bean()
+          log_record.setCollectionKey(entry.queueKey)
+          log_record.setEntryKey(new Buffer(key, 9, 8))
+          log_record.setValueLocation(dataLocator.pos)
+          log_record.setValueLength(dataLocator.len)
+          appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+          val index_record = new EntryRecord.Bean()
+          index_record.setValueLocation(dataLocator.pos)
+          index_record.setValueLength(dataLocator.len)
+          batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
+
+          val log_data = encodeEntryRecord(log_record.freeze())
+          val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
+
+          appender.append(LOG_ADD_ENTRY, log_data)
+          batch.put(key, index_data)
+
+          for (key <- logRefKey(dataLocator.pos, log_info)) {
+            logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
+          }
+
+          collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
+          write_enqueue_total += System.nanoTime() - start
         }
-      } // end of log.appender { block }
 
-      // now that data is logged.. locate message from the data in the logs
-      uows.foreach { uow =>
-        uow.actions.foreach { case (msg, action) =>
-          val messageRecord = action.messageRecord
-          if (messageRecord != null) {
-            messageRecord.id.setDataLocator(messageRecord.locator)
+        for( entry <- action.xaAcks ) {
+
+          val ack = entry.ack
+          if (dataLocator == null) {
+            dataLocator = ack.getLastMessageId.getDataLocator match {
+              case x: DataLocator => x
+              case x: MessageRecord => x.locator
+              case _ =>
+                throw new RuntimeException("Unexpected locator type")
+            }
           }
+          println(dataLocator)
+
+          val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
+          val os = new DataByteArrayOutputStream()
+          os.writeLong(dataLocator.pos)
+          os.writeInt(dataLocator.len)
+          os.writeLong(el.qid)
+          os.writeLong(el.seq)
+          os.writeLong(entry.sub)
+          store.wireFormat.marshal(ack, os)
+          var ack_encoded = os.toBuffer
+
+          val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
+          val log_record = new EntryRecord.Bean()
+          log_record.setCollectionKey(entry.container)
+          log_record.setEntryKey(new Buffer(key, 9, 8))
+          log_record.setMeta(ack_encoded)
+          appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+          val index_record = new EntryRecord.Bean()
+          index_record.setMeta(ack_encoded)
+          batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
         }
       }
+
+      for( entry <- uow.subAcks ) {
+        val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
+        val log_record = new EntryRecord.Bean()
+        log_record.setCollectionKey(entry.subKey)
+        log_record.setEntryKey(ACK_POSITION)
+        log_record.setValueLocation(entry.ackPosition)
+        appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+        val index_record = new EntryRecord.Bean()
+        index_record.setValueLocation(entry.ackPosition)
+        batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
+      }
+
+      if (uow.syncNeeded) {
+        syncNeeded = true
+      }
     }
+
+    max_write_message_latency.add(write_message_total)
+    max_write_enqueue_latency.add(write_enqueue_total)
+    syncNeeded
   }
 
   def getCollectionEntries(collectionKey: Long, firstSeq:Long, lastSeq:Long): Seq[(Buffer, EntryRecord.Buffer)] = {