You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@esme.apache.org by le...@apache.org on 2012/08/20 13:16:35 UTC

svn commit: r1374973 - in /esme/branches/akka: ./ src/main/scala/bootstrap/liftweb/ src/main/scala/org/apache/esme/actor/ src/main/scala/org/apache/esme/lib/ src/main/scala/org/apache/esme/model/

Author: lester
Date: Mon Aug 20 11:16:34 2012
New Revision: 1374973

URL: http://svn.apache.org/viewvc?rev=1374973&view=rev
Log:
In scope of ESME-360: Began work  on XMPP Consumer component action.

Added:
    esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
    esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
Modified:
    esme/branches/akka/build.sbt
    esme/branches/akka/pom.xml
    esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
    esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
    esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
    esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala

Modified: esme/branches/akka/build.sbt
URL: http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/build.sbt (original)
+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012
@@ -8,7 +8,7 @@ version := "1.4"
 
 organization := "Apache Software Foundation"
 
-scalaVersion := "2.9.1"    
+scalaVersion := "2.9.1"
 
 //scalacOptions ++= Seq("-unchecked", "-deprecation")  
 scalacOptions ++= Seq("-deprecation") 
@@ -28,7 +28,7 @@ libraryDependencies ++= {
   val compassVersion = "2.1.1"
   val luceneVersion = "2.4.0"
   val scalazVersion = "6.0.4"
-  val akkaVersion = "2.0.2"
+  val akkaVersion = "2.1-20120701-002745"
   val eclipsejettyVersion = "7.3.1.v20110307"
   val mortbayjettyVersion = "6.1.22"
   val slf4jVersion = "1.6.4" 
@@ -47,7 +47,7 @@ libraryDependencies ++= {
     "net.liftweb" %% "lift-textile" % liftVersion % "compile->default",
     "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
     "com.typesafe.akka" % "akka-actor" % akkaVersion % "compile->default",
-    "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" % "compile->default",
+    "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % "compile->default",
     "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
     "javax.servlet" % "servlet-api" % "2.5" % "provided->default",
     "org.compass-project" % "compass" % compassVersion % "compile->default",

Modified: esme/branches/akka/pom.xml
URL: http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/pom.xml (original)
+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012
@@ -76,7 +76,7 @@
         <lift.version>2.4</lift.version>
         <scala.version>2.9.1</scala.version>
         <scalaz.version>6.0.4</scalaz.version>
-        <akka.version>2.1-SNAPSHOT</akka.version>
+        <akka.version>2.1-20120701-002745</akka.version>
         <compass.version>2.1.1</compass.version>
         <lucene.version>2.4.0</lucene.version>
         <netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server>
@@ -91,17 +91,6 @@
             <url>http://repo2.maven.org/maven2/</url>
         </repository>
         <repository>
-            <id>scala-tools.org</id>
-            <name>Scala-Tools Maven2 Repository</name>
-            <url>http://scala-tools.org/repo-releases</url>
-        </repository>
-        <repository>
-            <id>scala-tools.org.snapshots</id>
-            <name>Scala-Tools Maven2 Repository for Snapshots</name>
-            <url>http://scala-tools.org/repo-snapshots</url>
-            <snapshots/>
-        </repository>
-        <repository>
             <id>typesafe</id>
             <name>Typesafe Repository Releases</name>
             <url>http://repo.typesafe.com/typesafe/releases</url>
@@ -135,9 +124,9 @@
 
     <pluginRepositories>
         <pluginRepository>
-            <id>scala-tools.org</id>
-            <name>Scala-Tools Maven2 Repository</name>
-            <url>http://scala-tools.org/repo-releases</url>
+            <id>typesafe</id>
+            <name>Typesafe Repository Releases</name>
+            <url>http://repo.typesafe.com/typesafe/releases</url>
         </pluginRepository>
     </pluginRepositories>
 
@@ -232,7 +221,7 @@
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-camel</artifactId>
-            <version>2.1-SNAPSHOT</version>
+            <version>${akka.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>

Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala (original)
+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon Aug 20 11:16:34 2012
@@ -27,6 +27,8 @@ import net.liftweb.http.auth._
 import net.liftweb.sitemap._
 import net.liftweb.sitemap.Loc._
 import Helpers._
+import akka.actor.{Props => AkkaProps, ActorSystem}
+
 //import TimeHelpers.intToTimeSpanBuilder
 //import net.liftweb.mapper.{DB, ConnectionManager, Schemifier, DefaultConnectionIdentifier, ConnectionIdentifier}
 import java.sql.{Connection, DriverManager}
@@ -249,6 +251,8 @@ class Boot extends Loggable {
     ConvDistributor.touch
     // ScalaInterpreter.touch
 
+    val sys = ActorSystem("camel")
+    val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), "XmppSupervisor")
 
     // Initiating popular links and resent messages
     val resentPeriod = Props.getLong("stats.resent.period", 1 week)

Modified: esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala (original)
+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala Mon Aug 20 11:16:34 2012
@@ -64,8 +64,8 @@ object UserActor {
   val xmppUsr = Props.get("xmpp.user") openOr ""
   val xmppPwd = Props.get("xmpp.password") openOr ""
   val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
-  lazy val sys = ActorSystem("camel")
-  lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)))
+  val sys = ActorSystem("camel")
+  val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
 }
 
 
@@ -289,8 +289,14 @@ class UserActor extends LiftActor {
               Distributor !
               Distributor.AddMessageToMailbox(id, msg, ResendReason(userId))
 
-            case FetchFeed(_, _) => 
-              MessagePullActor ! MessagePullActor.Fetch(td.performId)    
+            case XmppFrom(_) => {
+              val sys = ActorSystem("camel")
+              sys.actorFor("XmppSupervisor") ! XmppSupervisor.Fetch(td.performId)
+            }
+
+
+            case FetchFeed(_, _) =>
+              MessagePullActor ! MessagePullActor.Fetch(td.performId)
 
             case ScalaInterpret => logger.info("Scala interpreter is disabled!")
             /*if (msg.source.is != "scala")

Added: esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala (added)
+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala Mon Aug 20 11:16:34 2012
@@ -0,0 +1,51 @@
+package org.apache.esme.actor
+
+import akka.camel.{CamelMessage, Consumer}
+
+import net.liftweb.common.{Empty, Logger}
+import collection.immutable.Queue
+import org.apache.esme.actor.Distributor.UserCreatedMessage
+import org.apache.esme.model.User
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: lester
+ * Date: 17.08.12
+ * Time: 2:26
+ */
+
+object XmppReceiver {
+  val logger: Logger = Logger("org.apache.esme.actor")
+  case class FetchMessages()
+}
+
+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String, esmePwd: String, xmppServiceName: String, participant: String, user: User) extends Consumer {
+
+  import XmppReceiver._
+
+  var messages: List[(String, Long)] = List.empty
+
+  def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI is: %s".format(uri)); uri}
+
+  def receive = {
+    case msg: CamelMessage => {
+      messages = (msg.bodyAs[String], System.currentTimeMillis) :: messages
+    }
+    case FetchMessages => {
+      messages.foreach(message =>
+        Distributor ! UserCreatedMessage(
+          if (user != null) {user.id} else 0,
+          message._1,
+          Nil,
+          message._2,
+          Empty,
+          participant,
+          Empty,
+          None
+        )
+      )
+      messages = List.empty
+    }
+    case _ => logger.error("Incoming message is not Camel Message!")
+  }
+}
\ No newline at end of file

Added: esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala (added)
+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala Mon Aug 20 11:16:34 2012
@@ -0,0 +1,66 @@
+package org.apache.esme.actor
+
+import akka.actor.{ActorRef, Actor, Props => AkkaProps}
+import net.liftweb.util.Props
+import org.apache.esme.actor.XmppReceiver.FetchMessages
+import org.apache.esme.model.User
+import net.liftweb.common.Logger
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: lester
+ * Date: 17.08.12
+ * Time: 4:21
+ */
+
+object XmppSupervisor {
+  val logger: Logger = Logger("org.apache.esme.actor")
+
+
+  sealed trait XmppSupervisorActions
+  case class Fetch(id: Long) extends XmppSupervisorActions
+  case class Start(id: Long, who: String, usr: User) extends XmppSupervisorActions
+  case class Stop(id: Long) extends XmppSupervisorActions
+}
+
+class XmppSupervisor extends Actor {
+
+  import XmppSupervisor._
+
+  private var xmppPullActors: Map[Long, ActorRef] = Map.empty
+
+  var xmppHost: String = _
+  var xmppPort: String = _
+  var xmppUsr: String = _
+  var xmppPwd: String = _
+  var xmppServiceName: String = _
+
+
+  override def preStart() {
+    logger.info("preStart() called")
+
+    xmppHost = Props.get("xmpp.host") openOr ""
+    xmppPort = Props.get("xmpp.port") openOr ""
+    xmppUsr = Props.get("xmpp.user") openOr ""
+    xmppPwd = Props.get("xmpp.password") openOr ""
+    xmppServiceName = Props.get("xmpp.serviceName") openOr ""
+  }
+
+  def receive = {
+    case Start(id, who, usr) => {
+      logger.info("Start message received. User: %s, who: %s".format(usr, who))
+      xmppPullActors += (id -> context.actorOf(AkkaProps(new XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, who, usr))))
+    }
+    case Stop(id) => {
+      xmppPullActors.get(id).foreach { ref =>
+        context.stop(ref)
+        xmppPullActors -= id
+      }
+    }
+    case Fetch(id) => {
+      logger.info("Fetch message received")
+      xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
+    }
+    case _ => logger.info("Unknown message received")
+  }
+}

Modified: esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala (original)
+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala Mon Aug 20 11:16:34 2012
@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers(
   lazy val password: Parser[String] = user
 
   lazy val mailtoUrl: Parser[String] = accept("mailto:") ~> emailAddr
-  lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
+  lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
+  lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr
 
   lazy val emailAddr: Parser[String] = rep1(xchar) ^^ {
     case xs => xs.mkString
@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers(
   (mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
     case mt ~ text => MailTo(mt, text.map(_ mkString))
   }) |
-  (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
+  (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
     case mt ~ text => XmppTo(mt, text.map(_ mkString))
   }) |
+  (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
+    case mt ~ text => XmppFrom(mt)
+  }) |
   (scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ {
-      case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
-        HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
-    }) |
+    case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
+      HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
+  }) |
   (acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ {
     case url ~ tags => FetchAtom(UrlStore.make(url), tags)
   }) |

Modified: esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
==============================================================================
--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala (original)
+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala Mon Aug 20 11:16:34 2012
@@ -34,7 +34,15 @@ import java.util.Calendar
 import java.util.Date
 import scala.xml.{Text, Node, Elem => XmlElem}
 
-object Action extends Action with LongKeyedMetaMapper[Action] {
+import akka.actor.{Props => AkkaProps, ActorSystem}
+
+object Action extends Action with LongKeyedMetaMapper[Action] with Logger {
+
+  val logger: Logger = Logger("org.apache.esme.model")
+  val sys = ActorSystem("camel")
+  val xmppSupervisor = sys.actorFor("XmppSupervisor")
+
+
   override def afterCommit = notifyDistributor _ :: super.afterCommit
 
   private def notifyDistributor(in: Action) {
@@ -58,6 +66,7 @@ object Action extends Action with LongKe
     } else {
       SchedulerActor ! SchedulerActor.StopRegular(in.id)
       MessagePullActor ! MessagePullActor.StopPullActor(in.id)
+      xmppSupervisor ! XmppSupervisor.Stop(in.id)
     }
   }
   
@@ -185,6 +194,9 @@ object Action extends Action with LongKe
  */
 class Action extends LongKeyedMapper[Action] {
 
+  import Action.xmppSupervisor
+  import Action.logger
+
   /**
    * Actors related to regularly executed actions are started here
    * This is done when the action is activated or at the start of the application
@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act
               case FetchRss(_, _) => new RssFeed(u, url.url, urlSourcePrefix + url.uniqueId, 0, tags)
             }
             MessagePullActor ! MessagePullActor.StartPullActor(id.is, lastMsg, feed)
-          
+
+          case _ =>
+        }
+      }
+      case XmppFrom(who) => {
+        User.find(user) match {
+          case Full(u) =>
+            xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
           case _ =>
         }
       }
@@ -569,6 +588,7 @@ sealed trait Performances
 case class MailTo(who: String, text: Option[String]) extends Performances
 case class HttpTo(url: String, user: String, password: String, headers: List[(String, String)], data: Option[String]) extends Performances
 case class XmppTo(who: String, text: Option[String]) extends Performances
+case class XmppFrom(who: String) extends Performances
 case class FetchAtom(override val url: UrlStore, override val tags: List[String]) extends FetchFeed(url, tags)
 case class FetchRss(override val url: UrlStore, override val tags: List[String]) extends FetchFeed(url, tags)
 case object PerformResend extends Performances