You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@esme.apache.org by es...@apache.org on 2010/01/02 19:06:02 UTC
svn commit: r895247 - in /incubator/esme/trunk/server/src:
main/resources/props/ main/scala/org/apache/esme/api/
main/scala/org/apache/esme/model/ test/scala/org/apache/esme/api/
Author: esjewett
Date: Sat Jan 2 18:06:01 2010
New Revision: 895247
URL: http://svn.apache.org/viewvc?rev=895247&view=rev
Log:
[ESME-138] Implement streaming for streaming API endpoints \nThis patch implements streaming pools. History is implemented using Compass search. Streams are implemented using the public timeline listeners, then filtering by pool.
Modified:
incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml
incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml
incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml
incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml
incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala
incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala
incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala
Modified: incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml (original)
+++ incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml Sat Jan 2 18:06:01 2010
@@ -12,6 +12,7 @@
<searchEngine >
<analyzer name="default" type="Stop" />
<analyzer name="tag" type="Standard" />
+ <analyzer name="pool" type="Standard" />
<analyzer name="stemming" type="Snowball" snowballType="English">
<stopWords>
<stopWord value="no" />
Modified: incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml (original)
+++ incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml Sat Jan 2 18:06:01 2010
@@ -12,6 +12,7 @@
<searchEngine >
<analyzer name="default" type="Stop" />
<analyzer name="tag" type="Standard" />
+ <analyzer name="pool" type="Standard" />
<analyzer name="stemming" type="Snowball" snowballType="English">
<stopWords>
<stopWord value="no" />
Modified: incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml (original)
+++ incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml Sat Jan 2 18:06:01 2010
@@ -18,7 +18,8 @@
</connection>
<searchEngine >
<analyzer name="default" type="Stop" />
- <analyzer name="tag" type="Standard" />
+ <analyzer name="tag" type="Standard" />
+ <analyzer name="pool" type="Standard" />
<analyzer name="stemming" type="Snowball" snowballType="English">
<stopWords>
<stopWord value="no" />
Modified: incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml (original)
+++ incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml Sat Jan 2 18:06:01 2010
@@ -16,7 +16,8 @@
</connection>
<searchEngine >
<analyzer name="default" type="Stop" />
- <analyzer name="tag" type="Standard" />
+ <analyzer name="tag" type="Standard" />
+ <analyzer name="pool" type="Standard" />
<analyzer name="stemming" type="Snowball" snowballType="English">
<stopWords>
<stopWord value="no" />
Modified: incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala (original)
+++ incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala Sat Jan 2 18:06:01 2010
@@ -61,7 +61,14 @@
import scala.xml.{NodeSeq, Text, Elem, XML, Node}
import scala.collection.mutable.ListBuffer
-import java.util.logging._
+import java.util.logging._
+
+import org.compass.annotations._
+import bootstrap.liftweb.Compass.compass
+import org.compass.core._
+import lucene.util._
+import org.apache.lucene.index.TermFreqVector
+import org.tartarus.snowball.ext.PorterStemmer
object API2 extends ApiHelper with XmlHelper {
val logger: Logger = Logger.getLogger("org.apache.esme.api")
@@ -117,7 +124,13 @@
case Req("api2" :: "pools" :: Nil, _, GetRequest) => allPools
case Req("api2" :: "pools" :: Nil, _, PostRequest) => () => addPool
case Req("api2" :: "pools" :: poolId :: "users" :: Nil, _, PostRequest) => ()
- => addUserToPool(Box(List(poolId)))
+ => addUserToPool(Box(List(poolId)))
+ case Req("api2" :: "pools" :: poolId :: "messages" :: Nil, _, GetRequest)
+ if S.param("timeout").isDefined => () => waitForPoolMsgs(poolId)
+ case Req("api2" :: "pools" :: poolId :: "messages" :: Nil, _, GetRequest)
+ if S.param("history").isDefined => () => histPoolMsgs(poolId)
+ case Req("api2" :: "pools" :: poolId :: "messages" :: Nil, _, GetRequest) => ()
+ => getPoolMsgs(poolId)
case Req("api2" :: "conversations" :: conversationId :: Nil, _, GetRequest) => ()
=> getConversation(Box(List(conversationId)))
@@ -574,7 +587,101 @@
if(ret.isDefined) ret else Full((403,Map(),Empty))
r
- }
+ }
+
+ def histPoolMsgs(poolId: String): LiftResponse = {
+ val ret: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+ for (user <- User.currentUser;
+ val poolNum = poolId.toInt;
+ val num = S.param("history").map(_.toInt) openOr 40)
+ yield {
+ val boxed_lst: Box[List[Message]] =
+ for(session <- compass.map(_.openSession()); user <- User.currentUser)
+ yield {
+ var tx:CompassTransaction = null
+ var returnValue:List[Message] = Nil
+
+ try {
+ tx = session.beginTransaction()
+ val queryBuilder: CompassQueryBuilder = session.queryBuilder()
+
+ val query: CompassQuery = queryBuilder.bool()
+ .addMust(queryBuilder.term("pool", poolNum))
+ .toQuery()
+
+ val hitlist = query
+ .addSort("when", CompassQuery.SortPropertyType.STRING, CompassQuery.SortDirection.REVERSE)
+ .hits().detach(0, num)
+
+ val resourceList = hitlist.getResources.toList.asInstanceOf[List[Resource]]
+
+ val msgIds = resourceList.map(_.getId.toLong)
+ returnValue = Message.findMessages(msgIds).values.toList
+ tx.commit();
+ } catch {
+ case ce: CompassException =>
+ if (tx != null) tx.rollback();
+ } finally {
+ session.close();
+ }
+ returnValue
+ }
+
+ val lst: List[Message] = boxed_lst.openOr(List())
+
+ (200,Map(),Full(<messages>{lst.flatMap(msgToXml(_))}</messages>))
+ }
+
+ val r: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+ if(ret.isDefined) ret else Full((403,Map(),Empty))
+
+ r
+ }
+
+ def getPoolMsgs(poolId: String): LiftResponse = {
+ val future = new LAFuture[List[(Message, MailboxReason)]]()
+
+ def waitForAnswer: Box[List[(Message, MailboxReason)]] =
+ future.get(60L * 1000L)
+
+ val ret: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+ for (user <- User.currentUser;
+ act <- poolRestActors.findOrCreate(poolId.toLong);
+ val ignore = act ! ListenFor(future, 0 seconds);
+ answer <- waitForAnswer)
+ yield {
+ if(answer.isEmpty) (304,Map(),Empty)
+ else (200,Map(),Full(<messages>{answer.flatMap{ case (msg, reason) => msgToXml(msg) }}</messages>))
+ }
+
+ val r: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+ if(ret.isDefined) ret else Full((403,Map(),Empty))
+
+ r
+ }
+
+ def waitForPoolMsgs(poolId: String): LiftResponse = {
+ val future = new LAFuture[List[(Message, MailboxReason)]]()
+
+ def waitForAnswer: Box[List[(Message, MailboxReason)]] =
+ future.get(6L * 60L * 1000L)
+
+ val ret: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+ for (user <- User.currentUser;
+ act <- poolRestActors.findOrCreate(poolId.toLong);
+ length <- S.param("timeout").map(_.toInt * 1000);
+ val ignore = act ! ListenFor(future, TimeSpan(length));
+ answer <- waitForAnswer)
+ yield {
+ if(answer.isEmpty) (304,Map(),Empty)
+ else (200,Map(),Full(<messages>{answer.flatMap{ case (msg, reason) => msgToXml(msg) }}</messages>))
+ }
+
+ val r: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+ if(ret.isDefined) ret else Full((403,Map(),Empty))
+
+ r
+ }
def getConversation(conversationId: Box[String]): LiftResponse = {
val ret: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
@@ -614,44 +721,81 @@
val ret = new RestActor
ret ! StartUp(userId)
ret
- }
+ }
+
+ private def buildPublicTimelineActor(matcher: Function1[Message,Boolean]): RestActor = {
+ val ret = new RestActor(matcher)
+ ret ! StartUpPublic
+ ret
+ }
object messageRestActor extends SessionVar[Box[RestActor]](Empty) {
override def onShutdown(session: LiftSession) = this.is.map(_ ! ByeBye)
}
-// object tagRestActors extends SessionVar[Map[String,Box[RestActor]]](Map()) {
-// override def onShutdown(session: LiftSession) = this.is.values.map(_ ! ByeBye)
+ object poolRestActors extends SessionVar[Map[Long,RestActor]](Map()) {
+ override def onShutdown(session: LiftSession) = this.is.values.map(_ ! ByeByePublic)
-// def findOrCreate(tag: String): Box[RestActor] => this.is.getOrElseUpdate(tag, createNew(tag))
+ def poolMatcher(msg: Message): Boolean =
+ msg.pool == 1
-// def createNew(tag: String): (Box[RestActor]) => ("placeholder",buildActor)
-// }
+ def findOrCreate(pool: Long): Box[RestActor] = {
+ Full(this.getOrElse(pool, {
+ def partialMatcher(msg: Message) = { msg.pool == pool }
+ val newActor: RestActor = buildPublicTimelineActor(partialMatcher _)
+ this.update((oldMap) => oldMap+((pool, newActor)))
+ newActor
+ }))
+ }
+ }
- class RestActor extends LiftActor {
- private var userId: Long = _
+ class RestActor(msgMatch: Function1[Message,Boolean]) extends LiftActor {
+ private var userId: Box[Long] = Empty
private var msgs: List[(Message, MailboxReason)] = Nil
private var listener: Box[LAFuture[List[(Message, MailboxReason)]]] = Empty
-
+
+ def this() = this((msgToTest: Message) => true)
+
protected def messageHandler = {
case StartUp(userId) =>
- this.userId = userId
+ this.userId = Full(userId)
Distributor ! Distributor.Listen(userId, this)
+ case StartUpPublic =>
+ Distributor ! Distributor.PublicTimelineListeners(this)
+
case ByeBye =>
- Distributor ! Distributor.Unlisten(userId, this)
+ Distributor ! Distributor.Unlisten(userId.openOr(0), this)
+
+ case ByeByePublic =>
+ Distributor ! Distributor.PublicTimelineUnlisteners(this)
case UserActor.MessageReceived(msg, reason) =>
reason match {
case r: RegularReason => {}
case _ =>
- msgs = (msg, reason) :: msgs
+ msg match {
+ case _ if msgMatch(msg) =>
+ msgs = (msg, reason) :: msgs
+ listener.foreach {
+ who =>
+ who.satisfy(msgs)
+ listener = Empty
+ msgs = Nil
+ }
+ }
+ }
+
+ case Distributor.NewMessage(msg) =>
+ msg match {
+ case _ if msgMatch(msg) =>
+ msgs = (msg, NoReason) :: msgs
listener.foreach {
who =>
who.satisfy(msgs)
listener = Empty
msgs = Nil
- }
+ }
}
case ReleaseListener =>
@@ -674,8 +818,10 @@
}
- private case class StartUp(userId: Long)
- private case object ByeBye
+ private case class StartUp(userId: Long)
+ private case object StartUpPublic
+ private case object ByeBye
+ private case object ByeByePublic
private case class ListenFor(who: LAFuture[List[(Message, MailboxReason)]],
howLong: TimeSpan)
private case object ReleaseListener
Modified: incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala (original)
+++ incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala Sat Jan 2 18:06:01 2010
@@ -480,7 +480,10 @@
def getTags:String = {
// Create a string of space-separated tags, with the spaces in each tag converted to underscores
tags.map(x => x.split(" ").mkString("_")) mkString " "
- }
+ }
+
+ @SearchableProperty{val termVector=TermVector.YES, val analyzer="pool"}
+ def getPool = pool.is
/**
* Parse and format into XML
Modified: incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala (original)
+++ incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala Sat Jan 2 18:06:01 2010
@@ -275,7 +275,7 @@
mess_res1 <- session.post("user/messages", "message" -> "test message")
timeout <- sleep(2000)
mess_res <- session.get("user/messages")
- } {
+ } {
mess_res.code must be equalTo 200
// Message structure
@@ -648,5 +648,98 @@
}
}
}
+
+ "/pools/POOLID/messages GET" in {
+ "with valid session and new messages" in {
+ for{
+ sess <- post_session
+ pool_res <- sess.post("pools", "poolName" -> "test_pool3")
+ init <- sess.get("pools/3/messages")
+ timeout <- sleep(2000)
+ mess_res1 <- sess.post("user/messages",
+ "message" -> "test message for pool delta",
+ "pool" -> "test_pool3")
+ timeout <- sleep(2000)
+ mess_res <- sess.get("pools/3/messages")
+ } {
+ mess_res.code must be equalTo 200
+
+ // Message structure
+ (mess_res.xml \ "messages") must \\(<id>{theUser.id.toString}</id>)
+ (mess_res.xml \ "messages") must \\(<body>test message for pool delta</body>)
+ }
+ }
+
+ "with no session returns 403 (forbidden)" in {
+ for (session_res <- get("pools/1/messages")) {
+ session_res.code must be equalTo 403
+ }
+ }
+
+ // Should be a 304, but this response type isn't implemented in Lift yet...
+ "when no new messages exist, returns 204 (no content)" in {
+ for (session <- post_session;
+ session_res1 <- session.get("pools/1/messages");
+ session_res <- session.get("pools/1/messages"))
+ {
+ session_res.code must be equalTo 204
+ }
+ }
+ }
+
+ "/pools/POOLID/messages?history=10 GET" in {
+ "with valid session" in {
+ for{
+ sess <- post_session
+ pool_res <- sess.post("pools", "poolName" -> "test_pool4")
+ mess_res <- sess.post("user/messages",
+ "message" -> "test message for pool history",
+ "pool" -> "test_pool4")
+ res <- sess.get("pools/4/messages?history=10")
+ } {
+ res.code must be equalTo 200
+ (res.xml \ "messages") must \\(<id>{theUser.id.toString}</id>)
+ (res.xml \ "messages") must \\(<body>test message for pool history</body>)
+ }
+ }
+
+ "with no session returns 403 (forbidden)" in {
+ for (session_res <- get("pools/1/messages?history=10")) {
+ session_res.code must be equalTo 403
+ }
+ }
+ }
+
+ "/pools/POOLID/messages?timeout=2 GET" in {
+ "with valid session" in {
+ for{
+ sess <- post_session
+ pool_res <- sess.post("pools", "poolName" -> "test_pool5")
+ init <- sess.get("pool/5/messages")
+ mess_res <- sess.post("user/messages",
+ "message" -> "test message for pool timeout",
+ "pool" -> "test_pool5")
+ res <- sess.get("pools/5/messages?timeout=2")
+ } {
+ res.code must be equalTo 200
+ }
+ }
+
+ "with no session returns 403 (forbidden)" in {
+ for (session_res <- get("pools/1/messages?timeout=2")) {
+ session_res.code must be equalTo 403
+ }
+ }
+
+ // Should be a 304, but this response type isn't implemented in Lift yet...
+ "when no new messages exist, returns 204 (no content)" in {
+ for (session <- post_session;
+ session_res1 <- session.get("pools/1/messages");
+ session_res <- session.get("pools/1/messages?timeout=2"))
+ {
+ session_res.code must be equalTo 204
+ }
+ }
+ }
}
}
\ No newline at end of file