You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/10 00:17:48 UTC

svn commit: r1480811 - in /activemq/trunk/activemq-leveldb-store/src: main/scala/org/apache/activemq/leveldb/ test/scala/org/apache/activemq/leveldb/test/

Author: chirino
Date: Thu May  9 22:17:47 2013
New Revision: 1480811

URL: http://svn.apache.org/r1480811
Log:
Fixes AMQ-4251: Scala compile warnings - Compiling activemq-leveldb-store

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1480811&r1=1480810&r2=1480811&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Thu May  9 22:17:47 2013
@@ -34,10 +34,11 @@ import org.apache.activemq.util.ByteSequ
 import util.TimeMetric
 import scala.Some
 
+case class EntryLocator(qid:Long, seq:Long)
+case class DataLocator(pos:Long, len:Int)
 case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
-  var locator:(Long, Int) = _
+  var locator:DataLocator = _
 }
-
 case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
 case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
 case class QueueEntryRange()
@@ -266,13 +267,13 @@ class DelayableUOW(val manager:DBManager
         record
       case record:MessageRecord =>
         record
-      case x:(Long, Int) =>
+      case x:DataLocator =>
         null
     }
 
     val entry = QueueEntryRecord(id, queueKey, queueSeq)
     assert(id.getEntryLocator == null)
-    id.setEntryLocator((queueKey, queueSeq))
+    id.setEntryLocator(EntryLocator(queueKey, queueSeq))
 
     val a = this.synchronized {
       if( !delay )
@@ -293,7 +294,7 @@ class DelayableUOW(val manager:DBManager
   }
 
   def dequeue(expectedQueueKey:Long, id:MessageId) = {
-    val (queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[(Long, Long)];
+    val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
     assert(queueKey == expectedQueueKey)
     val entry = QueueEntryRecord(id, queueKey, queueSeq)
     this.synchronized {
@@ -670,7 +671,7 @@ class DBManager(val parent:LevelDBStore)
   }
 
   def queuePosition(id: MessageId):Long = {
-    id.getEntryLocator.asInstanceOf[(Long, Long)]._2
+    id.getEntryLocator.asInstanceOf[EntryLocator].seq
   }
 
   def createQueueStore(dest:ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1480811&r1=1480810&r2=1480811&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Thu May  9 22:17:47 2013
@@ -1094,9 +1094,9 @@ class LevelDBClient(store: LevelDBStore)
   def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
     collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
       val seq = decodeLong(key)
-      var locator = (value.getValueLocation, value.getValueLength)
+      var locator = DataLocator(value.getValueLocation, value.getValueLength)
       val msg = getMessage(locator)
-      msg.getMessageId().setEntryLocator((collectionKey, seq))
+      msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
       msg.getMessageId().setDataLocator(locator)
       func(msg)
     }
@@ -1114,14 +1114,14 @@ class LevelDBClient(store: LevelDBStore)
         val seq = is.readLong()
         val sub = is.readLong()
         val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck]
-        ack.getLastMessageId.setDataLocator((log, offset))
-        ack.getLastMessageId.setEntryLocator((qid, seq))
+        ack.getLastMessageId.setDataLocator(DataLocator(log, offset))
+        ack.getLastMessageId.setEntryLocator(EntryLocator(qid, seq))
 
         func(XaAckRecord(collectionKey, seq, ack, sub))
       } else {
-        var locator = (value.getValueLocation, value.getValueLength)
+        var locator = DataLocator(value.getValueLocation, value.getValueLength)
         val msg = getMessage(locator)
-        msg.getMessageId().setEntryLocator((collectionKey, seq))
+        msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
         msg.getMessageId().setDataLocator(locator)
         func(msg)
       }
@@ -1143,7 +1143,7 @@ class LevelDBClient(store: LevelDBStore)
       case x:MessageRecord =>
         // Encoded form is still in memory..
         Some(x.data)
-      case (pos:Long, len:Int) =>
+      case DataLocator(pos, len) =>
         // Load the encoded form from disk.
         log.read(pos, len).map(new Buffer(_))
     }
@@ -1214,26 +1214,26 @@ class LevelDBClient(store: LevelDBStore)
               val messageRecord = action.messageRecord
               var log_info:LogInfo = null
               var pos = -1L
-              var dataLocator:(Long, Int) = null
+              var dataLocator:DataLocator = null
 
               if (messageRecord != null && messageRecord.locator==null) {
                 val start = System.nanoTime()
                 val p = appender.append(LOG_DATA, messageRecord.data)
                 pos = p._1
                 log_info = p._2
-                dataLocator = (pos, messageRecord.data.length)
+                dataLocator = DataLocator(pos, messageRecord.data.length)
                 messageRecord.locator = dataLocator
                 write_message_total += System.nanoTime() - start
               }
 
 
               action.dequeues.foreach { entry =>
-                val keyLocation = entry.id.getEntryLocator.asInstanceOf[(Long, Long)]
-                val key = encodeEntryKey(ENTRY_PREFIX, keyLocation._1, keyLocation._2)
+                val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
+                val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
 
                 if( dataLocator==null ) {
                   dataLocator = entry.id.getDataLocator match {
-                    case x:(Long, Int) => x
+                    case x:DataLocator => x
                     case x:MessageRecord => x.locator
                     case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
                   }
@@ -1242,11 +1242,11 @@ class LevelDBClient(store: LevelDBStore)
                 val log_record = new EntryRecord.Bean()
                 log_record.setCollectionKey(entry.queueKey)
                 log_record.setEntryKey(new Buffer(key, 9, 8))
-                log_record.setValueLocation(dataLocator._1)
+                log_record.setValueLocation(dataLocator.pos)
                 appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
 
                 batch.delete(key)
-                logRefDecrement(dataLocator._1)
+                logRefDecrement(dataLocator.pos)
                 collectionDecrementSize(entry.queueKey)
               }
 
@@ -1254,7 +1254,7 @@ class LevelDBClient(store: LevelDBStore)
                 
                 if(dataLocator ==null ) {
                   dataLocator = entry.id.getDataLocator match {
-                    case x:(Long, Int) => x
+                    case x:DataLocator => x
                     case x:MessageRecord => x.locator
                     case _ =>
                       throw new RuntimeException("Unexpected locator type")
@@ -1270,13 +1270,13 @@ class LevelDBClient(store: LevelDBStore)
                 val log_record = new EntryRecord.Bean()
                 log_record.setCollectionKey(entry.queueKey)
                 log_record.setEntryKey(new Buffer(key, 9, 8))
-                log_record.setValueLocation(dataLocator._1)
-                log_record.setValueLength(dataLocator._2)
+                log_record.setValueLocation(dataLocator.pos)
+                log_record.setValueLength(dataLocator.len)
                 appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
 
                 val index_record = new EntryRecord.Bean()
-                index_record.setValueLocation(dataLocator._1)
-                index_record.setValueLength(dataLocator._2)
+                index_record.setValueLocation(dataLocator.pos)
+                index_record.setValueLength(dataLocator.len)
                 batch.put(key,  encodeEntryRecord(index_record.freeze()).toByteArray)
 
                 val log_data = encodeEntryRecord(log_record.freeze())
@@ -1297,7 +1297,7 @@ class LevelDBClient(store: LevelDBStore)
                 val ack = entry.ack
                 if( dataLocator==null ) {
                   dataLocator = ack.getLastMessageId.getDataLocator match {
-                    case x:(Long, Int) => x
+                    case x:DataLocator => x
                     case x:MessageRecord => x.locator
                     case _ =>
                       throw new RuntimeException("Unexpected locator type")
@@ -1305,12 +1305,12 @@ class LevelDBClient(store: LevelDBStore)
                 }
                 println(dataLocator)
 
-                val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)];
+                val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
                 val os = new DataByteArrayOutputStream()
-                os.writeLong(dataLocator._1)
-                os.writeInt(dataLocator._2)
-                os.writeLong(qid)
-                os.writeLong(seq)
+                os.writeLong(dataLocator.pos)
+                os.writeInt(dataLocator.len)
+                os.writeLong(el.qid)
+                os.writeLong(el.seq)
                 os.writeLong(entry.sub)
                 store.wireFormat.marshal(ack, os)
                 var ack_encoded = os.toBuffer

Modified: activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala?rev=1480811&r1=1480810&r2=1480811&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala Thu May  9 22:17:47 2013
@@ -19,7 +19,6 @@ package org.apache.activemq.leveldb.test
 import org.apache.activemq.ActiveMQConnectionFactory
 import javax.jms.{Destination, ConnectionFactory}
 import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
-import org.apache.activemq.leveldb.test.JMSClientScenario
 
 /**
  * <p>

Modified: activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala?rev=1480811&r1=1480810&r2=1480811&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala Thu May  9 22:17:47 2013
@@ -18,7 +18,6 @@ package org.apache.activemq.leveldb.test
 
 import java.lang.Thread
 import javax.jms._
-import org.apache.activemq.leveldb.test.Scenario
 
 /**
  * <p>