You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/10 00:17:48 UTC
svn commit: r1480811 - in /activemq/trunk/activemq-leveldb-store/src:
main/scala/org/apache/activemq/leveldb/
test/scala/org/apache/activemq/leveldb/test/
Author: chirino
Date: Thu May 9 22:17:47 2013
New Revision: 1480811
URL: http://svn.apache.org/r1480811
Log:
Fixes AMQ-4251: Scala compile warnings - Compiling activemq-leveldb-store
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala
activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1480811&r1=1480810&r2=1480811&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Thu May 9 22:17:47 2013
@@ -34,10 +34,11 @@ import org.apache.activemq.util.ByteSequ
import util.TimeMetric
import scala.Some
+case class EntryLocator(qid:Long, seq:Long)
+case class DataLocator(pos:Long, len:Int)
case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
- var locator:(Long, Int) = _
+ var locator:DataLocator = _
}
-
case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
case class QueueEntryRange()
@@ -266,13 +267,13 @@ class DelayableUOW(val manager:DBManager
record
case record:MessageRecord =>
record
- case x:(Long, Int) =>
+ case x:DataLocator =>
null
}
val entry = QueueEntryRecord(id, queueKey, queueSeq)
assert(id.getEntryLocator == null)
- id.setEntryLocator((queueKey, queueSeq))
+ id.setEntryLocator(EntryLocator(queueKey, queueSeq))
val a = this.synchronized {
if( !delay )
@@ -293,7 +294,7 @@ class DelayableUOW(val manager:DBManager
}
def dequeue(expectedQueueKey:Long, id:MessageId) = {
- val (queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[(Long, Long)];
+ val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
assert(queueKey == expectedQueueKey)
val entry = QueueEntryRecord(id, queueKey, queueSeq)
this.synchronized {
@@ -670,7 +671,7 @@ class DBManager(val parent:LevelDBStore)
}
def queuePosition(id: MessageId):Long = {
- id.getEntryLocator.asInstanceOf[(Long, Long)]._2
+ id.getEntryLocator.asInstanceOf[EntryLocator].seq
}
def createQueueStore(dest:ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1480811&r1=1480810&r2=1480811&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Thu May 9 22:17:47 2013
@@ -1094,9 +1094,9 @@ class LevelDBClient(store: LevelDBStore)
def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
val seq = decodeLong(key)
- var locator = (value.getValueLocation, value.getValueLength)
+ var locator = DataLocator(value.getValueLocation, value.getValueLength)
val msg = getMessage(locator)
- msg.getMessageId().setEntryLocator((collectionKey, seq))
+ msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
msg.getMessageId().setDataLocator(locator)
func(msg)
}
@@ -1114,14 +1114,14 @@ class LevelDBClient(store: LevelDBStore)
val seq = is.readLong()
val sub = is.readLong()
val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck]
- ack.getLastMessageId.setDataLocator((log, offset))
- ack.getLastMessageId.setEntryLocator((qid, seq))
+ ack.getLastMessageId.setDataLocator(DataLocator(log, offset))
+ ack.getLastMessageId.setEntryLocator(EntryLocator(qid, seq))
func(XaAckRecord(collectionKey, seq, ack, sub))
} else {
- var locator = (value.getValueLocation, value.getValueLength)
+ var locator = DataLocator(value.getValueLocation, value.getValueLength)
val msg = getMessage(locator)
- msg.getMessageId().setEntryLocator((collectionKey, seq))
+ msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
msg.getMessageId().setDataLocator(locator)
func(msg)
}
@@ -1143,7 +1143,7 @@ class LevelDBClient(store: LevelDBStore)
case x:MessageRecord =>
// Encoded form is still in memory..
Some(x.data)
- case (pos:Long, len:Int) =>
+ case DataLocator(pos, len) =>
// Load the encoded form from disk.
log.read(pos, len).map(new Buffer(_))
}
@@ -1214,26 +1214,26 @@ class LevelDBClient(store: LevelDBStore)
val messageRecord = action.messageRecord
var log_info:LogInfo = null
var pos = -1L
- var dataLocator:(Long, Int) = null
+ var dataLocator:DataLocator = null
if (messageRecord != null && messageRecord.locator==null) {
val start = System.nanoTime()
val p = appender.append(LOG_DATA, messageRecord.data)
pos = p._1
log_info = p._2
- dataLocator = (pos, messageRecord.data.length)
+ dataLocator = DataLocator(pos, messageRecord.data.length)
messageRecord.locator = dataLocator
write_message_total += System.nanoTime() - start
}
action.dequeues.foreach { entry =>
- val keyLocation = entry.id.getEntryLocator.asInstanceOf[(Long, Long)]
- val key = encodeEntryKey(ENTRY_PREFIX, keyLocation._1, keyLocation._2)
+ val keyLocation = entry.id.getEntryLocator.asInstanceOf[EntryLocator]
+ val key = encodeEntryKey(ENTRY_PREFIX, keyLocation.qid, keyLocation.seq)
if( dataLocator==null ) {
dataLocator = entry.id.getDataLocator match {
- case x:(Long, Int) => x
+ case x:DataLocator => x
case x:MessageRecord => x.locator
case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
}
@@ -1242,11 +1242,11 @@ class LevelDBClient(store: LevelDBStore)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.queueKey)
log_record.setEntryKey(new Buffer(key, 9, 8))
- log_record.setValueLocation(dataLocator._1)
+ log_record.setValueLocation(dataLocator.pos)
appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
batch.delete(key)
- logRefDecrement(dataLocator._1)
+ logRefDecrement(dataLocator.pos)
collectionDecrementSize(entry.queueKey)
}
@@ -1254,7 +1254,7 @@ class LevelDBClient(store: LevelDBStore)
if(dataLocator ==null ) {
dataLocator = entry.id.getDataLocator match {
- case x:(Long, Int) => x
+ case x:DataLocator => x
case x:MessageRecord => x.locator
case _ =>
throw new RuntimeException("Unexpected locator type")
@@ -1270,13 +1270,13 @@ class LevelDBClient(store: LevelDBStore)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.queueKey)
log_record.setEntryKey(new Buffer(key, 9, 8))
- log_record.setValueLocation(dataLocator._1)
- log_record.setValueLength(dataLocator._2)
+ log_record.setValueLocation(dataLocator.pos)
+ log_record.setValueLength(dataLocator.len)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
- index_record.setValueLocation(dataLocator._1)
- index_record.setValueLength(dataLocator._2)
+ index_record.setValueLocation(dataLocator.pos)
+ index_record.setValueLength(dataLocator.len)
batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
val log_data = encodeEntryRecord(log_record.freeze())
@@ -1297,7 +1297,7 @@ class LevelDBClient(store: LevelDBStore)
val ack = entry.ack
if( dataLocator==null ) {
dataLocator = ack.getLastMessageId.getDataLocator match {
- case x:(Long, Int) => x
+ case x:DataLocator => x
case x:MessageRecord => x.locator
case _ =>
throw new RuntimeException("Unexpected locator type")
@@ -1305,12 +1305,12 @@ class LevelDBClient(store: LevelDBStore)
}
println(dataLocator)
- val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)];
+ val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
val os = new DataByteArrayOutputStream()
- os.writeLong(dataLocator._1)
- os.writeInt(dataLocator._2)
- os.writeLong(qid)
- os.writeLong(seq)
+ os.writeLong(dataLocator.pos)
+ os.writeInt(dataLocator.len)
+ os.writeLong(el.qid)
+ os.writeLong(el.seq)
os.writeLong(entry.sub)
store.wireFormat.marshal(ack, os)
var ack_encoded = os.toBuffer
Modified: activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala?rev=1480811&r1=1480810&r2=1480811&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala Thu May 9 22:17:47 2013
@@ -19,7 +19,6 @@ package org.apache.activemq.leveldb.test
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.{Destination, ConnectionFactory}
import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
-import org.apache.activemq.leveldb.test.JMSClientScenario
/**
* <p>
Modified: activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala?rev=1480811&r1=1480810&r2=1480811&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala Thu May 9 22:17:47 2013
@@ -18,7 +18,6 @@ package org.apache.activemq.leveldb.test
import java.lang.Thread
import javax.jms._
-import org.apache.activemq.leveldb.test.Scenario
/**
* <p>