You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@esme.apache.org by es...@apache.org on 2010/01/02 19:06:02 UTC

svn commit: r895247 - in /incubator/esme/trunk/server/src: main/resources/props/ main/scala/org/apache/esme/api/ main/scala/org/apache/esme/model/ test/scala/org/apache/esme/api/

Author: esjewett
Date: Sat Jan  2 18:06:01 2010
New Revision: 895247

URL: http://svn.apache.org/viewvc?rev=895247&view=rev
Log:
[ESME-138] Implement streaming for streaming API endpoints \nThis patch implements streaming pools. History is implemented using Compass search. Streams are implemented using the public timeline listeners, then filtering by pool.

Modified:
    incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml
    incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml
    incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml
    incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml
    incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala
    incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala
    incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala

Modified: incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml (original)
+++ incubator/esme/trunk/server/src/main/resources/props/compass.cfg.xml Sat Jan  2 18:06:01 2010
@@ -12,6 +12,7 @@
     <searchEngine >
       <analyzer name="default" type="Stop" />
       <analyzer name="tag" type="Standard" />
+      <analyzer name="pool" type="Standard" />
       <analyzer name="stemming" type="Snowball" snowballType="English">
             <stopWords>
                 <stopWord value="no" />

Modified: incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml (original)
+++ incubator/esme/trunk/server/src/main/resources/props/compass.filesystem.cfg.xml Sat Jan  2 18:06:01 2010
@@ -12,6 +12,7 @@
     <searchEngine >
       <analyzer name="default" type="Stop" />
       <analyzer name="tag" type="Standard" />
+      <analyzer name="pool" type="Standard" />
       <analyzer name="stemming" type="Snowball" snowballType="English">
             <stopWords>
                 <stopWord value="no" />

Modified: incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml (original)
+++ incubator/esme/trunk/server/src/main/resources/props/compass.jdbc.cfg.xml Sat Jan  2 18:06:01 2010
@@ -18,7 +18,8 @@
     </connection>
     <searchEngine >
       <analyzer name="default" type="Stop" />
-      <analyzer name="tag" type="Standard" />
+      <analyzer name="tag" type="Standard" />  
+      <analyzer name="pool" type="Standard" />
       <analyzer name="stemming" type="Snowball" snowballType="English">
             <stopWords>
                 <stopWord value="no" />

Modified: incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml (original)
+++ incubator/esme/trunk/server/src/main/resources/props/compass.jndi.cfg.xml Sat Jan  2 18:06:01 2010
@@ -16,7 +16,8 @@
     </connection>
     <searchEngine >
       <analyzer name="default" type="Stop" />
-      <analyzer name="tag" type="Standard" />
+      <analyzer name="tag" type="Standard" /> 
+      <analyzer name="pool" type="Standard" />
       <analyzer name="stemming" type="Snowball" snowballType="English">
             <stopWords>
                 <stopWord value="no" />

Modified: incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala (original)
+++ incubator/esme/trunk/server/src/main/scala/org/apache/esme/api/API2.scala Sat Jan  2 18:06:01 2010
@@ -61,7 +61,14 @@
 import scala.xml.{NodeSeq, Text, Elem, XML, Node}
 
 import scala.collection.mutable.ListBuffer
-import java.util.logging._
+import java.util.logging._      
+
+import org.compass.annotations._
+import bootstrap.liftweb.Compass.compass
+import org.compass.core._
+import lucene.util._
+import org.apache.lucene.index.TermFreqVector
+import org.tartarus.snowball.ext.PorterStemmer
 
 object API2 extends ApiHelper with XmlHelper {
   val logger: Logger = Logger.getLogger("org.apache.esme.api")
@@ -117,7 +124,13 @@
     case Req("api2" :: "pools" :: Nil, _, GetRequest) => allPools 
     case Req("api2" :: "pools" :: Nil, _, PostRequest) => () => addPool 
     case Req("api2" :: "pools" :: poolId :: "users" :: Nil, _, PostRequest) => () 
-			=> addUserToPool(Box(List(poolId))) 
+			=> addUserToPool(Box(List(poolId)))
+    case Req("api2" :: "pools" :: poolId :: "messages" :: Nil, _, GetRequest)
+ 	  if S.param("timeout").isDefined => () => waitForPoolMsgs(poolId)
+    case Req("api2" :: "pools" :: poolId :: "messages" :: Nil, _, GetRequest)
+      if S.param("history").isDefined => () => histPoolMsgs(poolId)   
+    case Req("api2" :: "pools" :: poolId :: "messages" :: Nil, _, GetRequest) => () 
+            => getPoolMsgs(poolId)
     
     case Req("api2" :: "conversations" :: conversationId :: Nil, _, GetRequest) => () 
 			=> getConversation(Box(List(conversationId)))
@@ -574,7 +587,101 @@
 	  if(ret.isDefined) ret else Full((403,Map(),Empty))
 
 	r
-  }  
+  }      
+
+  def histPoolMsgs(poolId: String): LiftResponse = {
+    val ret: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+      for (user <- User.currentUser;
+        val poolNum = poolId.toInt;
+		val num = S.param("history").map(_.toInt) openOr 40)		
+      yield {
+        val boxed_lst: Box[List[Message]] = 
+        for(session <- compass.map(_.openSession()); user <- User.currentUser)
+        yield {
+          var tx:CompassTransaction = null
+          var returnValue:List[Message] = Nil
+
+          try {
+            tx = session.beginTransaction()
+            val queryBuilder: CompassQueryBuilder = session.queryBuilder()
+
+            val query: CompassQuery = queryBuilder.bool()
+              .addMust(queryBuilder.term("pool", poolNum))
+              .toQuery()
+            
+            val hitlist = query
+              .addSort("when", CompassQuery.SortPropertyType.STRING, CompassQuery.SortDirection.REVERSE)
+              .hits().detach(0, num)
+             
+            val resourceList = hitlist.getResources.toList.asInstanceOf[List[Resource]]
+
+            val msgIds = resourceList.map(_.getId.toLong)
+            returnValue = Message.findMessages(msgIds).values.toList
+            tx.commit();
+          } catch  {
+            case ce: CompassException =>
+              if (tx != null) tx.rollback();
+          } finally {
+            session.close();
+          }
+          returnValue
+        }
+
+        val lst: List[Message] = boxed_lst.openOr(List()) 
+
+        (200,Map(),Full(<messages>{lst.flatMap(msgToXml(_))}</messages>))
+      } 
+
+    val r: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+      if(ret.isDefined) ret else Full((403,Map(),Empty))
+
+    r
+  }    
+
+  def getPoolMsgs(poolId: String): LiftResponse = {
+    val future = new LAFuture[List[(Message, MailboxReason)]]()
+  
+    def waitForAnswer: Box[List[(Message, MailboxReason)]] = 
+      future.get(60L * 1000L)
+
+    val ret: Box[Tuple3[Int,Map[String,String],Box[Elem]]] = 
+      for (user <- User.currentUser;     
+           act <- poolRestActors.findOrCreate(poolId.toLong);
+		   val ignore = act ! ListenFor(future, 0 seconds);
+	       answer <- waitForAnswer) 
+      yield { 
+        if(answer.isEmpty) (304,Map(),Empty)          
+        else (200,Map(),Full(<messages>{answer.flatMap{ case (msg, reason) => msgToXml(msg) }}</messages>))
+      }
+
+    val r: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+      if(ret.isDefined) ret else Full((403,Map(),Empty))
+
+    r
+  } 
+
+  def waitForPoolMsgs(poolId: String): LiftResponse = {
+    val future = new LAFuture[List[(Message, MailboxReason)]]()
+    
+    def waitForAnswer: Box[List[(Message, MailboxReason)]] = 
+      future.get(6L * 60L * 1000L)
+
+    val ret: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =  
+      for (user <- User.currentUser;
+           act <- poolRestActors.findOrCreate(poolId.toLong);
+	       length <- S.param("timeout").map(_.toInt * 1000);
+           val ignore = act ! ListenFor(future, TimeSpan(length));
+           answer <- waitForAnswer)
+      yield {
+        if(answer.isEmpty) (304,Map(),Empty)          
+        else (200,Map(),Full(<messages>{answer.flatMap{ case (msg, reason) => msgToXml(msg) }}</messages>))
+      }
+
+    val r: Box[Tuple3[Int,Map[String,String],Box[Elem]]] =
+      if(ret.isDefined) ret else Full((403,Map(),Empty))
+
+    r
+  }
 
   def getConversation(conversationId: Box[String]): LiftResponse = {
     val ret: Box[Tuple3[Int,Map[String,String],Box[Elem]]] = 
@@ -614,44 +721,81 @@
     val ret = new RestActor
     ret ! StartUp(userId)
     ret
-  }                    
+  } 
+
+  private def buildPublicTimelineActor(matcher: Function1[Message,Boolean]): RestActor = {
+    val ret = new RestActor(matcher)
+    ret ! StartUpPublic
+    ret
+  }                  
 
   object messageRestActor extends SessionVar[Box[RestActor]](Empty) {
     override def onShutdown(session: LiftSession) = this.is.map(_ ! ByeBye)
   }
   
-//  object tagRestActors extends SessionVar[Map[String,Box[RestActor]]](Map()) {
-//    override def onShutdown(session: LiftSession) = this.is.values.map(_ ! ByeBye)
+  object poolRestActors extends SessionVar[Map[Long,RestActor]](Map()) {
+    override def onShutdown(session: LiftSession) = this.is.values.map(_ ! ByeByePublic)
 
-//    def findOrCreate(tag: String): Box[RestActor] => this.is.getOrElseUpdate(tag, createNew(tag))
+    def poolMatcher(msg: Message): Boolean =
+      msg.pool == 1
 
-//    def createNew(tag: String): (Box[RestActor]) => ("placeholder",buildActor) 
-//  }
+    def findOrCreate(pool: Long): Box[RestActor] = {
+      Full(this.getOrElse(pool, {
+        def partialMatcher(msg: Message) = { msg.pool == pool }
+        val newActor: RestActor = buildPublicTimelineActor(partialMatcher _)
+        this.update((oldMap) => oldMap+((pool, newActor)))
+        newActor
+      }))                                                               
+    }
+  }
 
-  class RestActor extends LiftActor {
-    private var userId: Long = _
+  class RestActor(msgMatch: Function1[Message,Boolean]) extends LiftActor {
+    private var userId: Box[Long] = Empty
     private var msgs: List[(Message, MailboxReason)] = Nil
     private var listener: Box[LAFuture[List[(Message, MailboxReason)]]] = Empty
-    
+
+    def this() = this((msgToTest: Message) => true)                      
+
     protected def messageHandler = {
       case StartUp(userId) =>
-        this.userId = userId
+        this.userId = Full(userId)
         Distributor ! Distributor.Listen(userId, this)
 
+      case StartUpPublic =>
+        Distributor ! Distributor.PublicTimelineListeners(this)
+
       case ByeBye =>
-        Distributor ! Distributor.Unlisten(userId, this)
+        Distributor ! Distributor.Unlisten(userId.openOr(0), this)  
+
+      case ByeByePublic =>
+        Distributor ! Distributor.PublicTimelineUnlisteners(this)
           
       case UserActor.MessageReceived(msg, reason) =>
         reason match {
           case r: RegularReason => {}
           case _ =>
-            msgs = (msg, reason) :: msgs                          
+            msg match {
+              case _ if msgMatch(msg) =>
+                msgs = (msg, reason) :: msgs                          
+                listener.foreach {
+                  who =>
+                    who.satisfy(msgs)
+                    listener = Empty
+                    msgs = Nil
+                }
+            }
+        } 
+                                                                 
+      case Distributor.NewMessage(msg) =>
+        msg match {
+          case _ if msgMatch(msg) =>
+            msgs = (msg, NoReason) :: msgs                          
             listener.foreach {
               who =>
                 who.satisfy(msgs)
                 listener = Empty
                 msgs = Nil
-            }     
+            }
         }
       
       case ReleaseListener =>
@@ -674,8 +818,10 @@
   }
 
 
-  private case class StartUp(userId: Long)
-  private case object ByeBye
+  private case class StartUp(userId: Long) 
+  private case object StartUpPublic
+  private case object ByeBye      
+  private case object ByeByePublic
   private case class ListenFor(who: LAFuture[List[(Message, MailboxReason)]],
 			       howLong: TimeSpan)                                  
   private case object ReleaseListener       

Modified: incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala (original)
+++ incubator/esme/trunk/server/src/main/scala/org/apache/esme/model/Message.scala Sat Jan  2 18:06:01 2010
@@ -480,7 +480,10 @@
   def getTags:String = {
     // Create a string of space-separated tags, with the spaces in each tag converted to underscores
     tags.map(x => x.split(" ").mkString("_")) mkString " "
-  }
+  } 
+
+  @SearchableProperty{val termVector=TermVector.YES, val analyzer="pool"}
+  def getPool = pool.is
 
   /**
    * Parse and format into XML

Modified: incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala
URL: http://svn.apache.org/viewvc/incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala?rev=895247&r1=895246&r2=895247&view=diff
==============================================================================
--- incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala (original)
+++ incubator/esme/trunk/server/src/test/scala/org/apache/esme/api/API2Test.scala Sat Jan  2 18:06:01 2010
@@ -275,7 +275,7 @@
           mess_res1 <- session.post("user/messages", "message" -> "test message") 
           timeout <- sleep(2000)
           mess_res <- session.get("user/messages")
-        } {    
+        } {                  
           mess_res.code must be equalTo 200
 
           // Message structure   
@@ -648,5 +648,98 @@
         }
       }
     }
+
+    "/pools/POOLID/messages GET" in {
+	  "with valid session and new messages" in {
+	    for{
+	      sess <- post_session 
+     	  pool_res <- sess.post("pools", "poolName" -> "test_pool3") 
+          init <- sess.get("pools/3/messages")
+          timeout <- sleep(2000)
+		  mess_res1 <- sess.post("user/messages",
+            "message" -> "test message for pool delta",
+            "pool" -> "test_pool3") 
+          timeout <- sleep(2000)
+          mess_res <- sess.get("pools/3/messages")
+        } {                                   
+          mess_res.code must be equalTo 200
+
+          // Message structure   
+          (mess_res.xml \ "messages") must \\(<id>{theUser.id.toString}</id>)
+          (mess_res.xml \ "messages") must \\(<body>test message for pool delta</body>)
+        }
+      }
+
+      "with no session returns 403 (forbidden)" in {
+        for (session_res <- get("pools/1/messages")) {
+          session_res.code must be equalTo 403
+        }
+      }
+
+      // Should be a 304, but this response type isn't implemented in Lift yet...
+      "when no new messages exist, returns 204 (no content)" in {
+        for (session <- post_session;
+             session_res1 <- session.get("pools/1/messages");
+             session_res <- session.get("pools/1/messages"))
+        {             
+          session_res.code must be equalTo 204
+        }                                                   
+      }
+    }
+
+    "/pools/POOLID/messages?history=10 GET" in {
+      "with valid session" in {
+        for{
+          sess <- post_session 
+          pool_res <- sess.post("pools", "poolName" -> "test_pool4") 
+		  mess_res <- sess.post("user/messages",
+            "message" -> "test message for pool history",
+            "pool" -> "test_pool4")
+          res <- sess.get("pools/4/messages?history=10")
+        } {                             
+          res.code must be equalTo 200  
+          (res.xml \ "messages") must \\(<id>{theUser.id.toString}</id>)
+          (res.xml \ "messages") must \\(<body>test message for pool history</body>)
+        }
+      }
+
+      "with no session returns 403 (forbidden)" in {
+        for (session_res <- get("pools/1/messages?history=10")) {
+          session_res.code must be equalTo 403
+        }
+      }
+    }
+
+    "/pools/POOLID/messages?timeout=2 GET" in {
+      "with valid session" in {
+        for{
+          sess <- post_session 
+          pool_res <- sess.post("pools", "poolName" -> "test_pool5")    
+          init <- sess.get("pool/5/messages") 
+		  mess_res <- sess.post("user/messages",
+            "message" -> "test message for pool timeout",
+            "pool" -> "test_pool5") 
+          res <- sess.get("pools/5/messages?timeout=2")
+        } {                               
+          res.code must be equalTo 200
+        }
+      }
+
+      "with no session returns 403 (forbidden)" in {
+        for (session_res <- get("pools/1/messages?timeout=2")) {
+          session_res.code must be equalTo 403
+        }
+      }
+
+	// Should be a 304, but this response type isn't implemented in Lift yet...
+      "when no new messages exist, returns 204 (no content)" in {
+        for (session <- post_session;
+          session_res1 <- session.get("pools/1/messages");
+          session_res <- session.get("pools/1/messages?timeout=2"))
+        {             
+          session_res.code must be equalTo 204
+        }                                                   
+      }
+    }   
   }
 }
\ No newline at end of file