You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@esme.apache.org by AJ Prudhomme <aj...@atlas-title.com> on 2012/08/20 13:50:08 UTC
Re: svn commit: r1374973 - in /esme/branches/akka: ./
src/main/scala/bootstrap/liftweb/ src/main/scala/org/apache/esme/actor/
src/main/scala/org/apache/esme/lib/ src/main/scala/org/apache/esme/model/
Remove!
Sent from my Verizon Wireless Phone
lester@apache.org wrote:
>Author: lester
>Date: Mon Aug 20 11:16:34 2012
>New Revision: 1374973
>
>URL: http://svn.apache.org/viewvc?rev=1374973&view=rev
>Log:
>In scope of ESME-360: Began work on XMPP Consumer component action.
>
>Added:
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
>Modified:
> esme/branches/akka/build.sbt
> esme/branches/akka/pom.xml
> esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
> esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
> esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
>
>Modified: esme/branches/akka/build.sbt
>URL: http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/build.sbt (original)
>+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012
>@@ -8,7 +8,7 @@ version := "1.4"
>
> organization := "Apache Software Foundation"
>
>-scalaVersion := "2.9.1"
>+scalaVersion := "2.9.1"
>
> //scalacOptions ++= Seq("-unchecked", "-deprecation")
> scalacOptions ++= Seq("-deprecation")
>@@ -28,7 +28,7 @@ libraryDependencies ++= {
> val compassVersion = "2.1.1"
> val luceneVersion = "2.4.0"
> val scalazVersion = "6.0.4"
>- val akkaVersion = "2.0.2"
>+ val akkaVersion = "2.1-20120701-002745"
> val eclipsejettyVersion = "7.3.1.v20110307"
> val mortbayjettyVersion = "6.1.22"
> val slf4jVersion = "1.6.4"
>@@ -47,7 +47,7 @@ libraryDependencies ++= {
> "net.liftweb" %% "lift-textile" % liftVersion % "compile->default",
> "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
> "com.typesafe.akka" % "akka-actor" % akkaVersion % "compile->default",
>- "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" % "compile->default",
>+ "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % "compile->default",
> "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
> "javax.servlet" % "servlet-api" % "2.5" % "provided->default",
> "org.compass-project" % "compass" % compassVersion % "compile->default",
>
>Modified: esme/branches/akka/pom.xml
>URL: http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/pom.xml (original)
>+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012
>@@ -76,7 +76,7 @@
> <lift.version>2.4</lift.version>
> <scala.version>2.9.1</scala.version>
> <scalaz.version>6.0.4</scalaz.version>
>- <akka.version>2.1-SNAPSHOT</akka.version>
>+ <akka.version>2.1-20120701-002745</akka.version>
> <compass.version>2.1.1</compass.version>
> <lucene.version>2.4.0</lucene.version>
> <netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server>
>@@ -91,17 +91,6 @@
> <url>http://repo2.maven.org/maven2/</url>
> </repository>
> <repository>
>- <id>scala-tools.org</id>
>- <name>Scala-Tools Maven2 Repository</name>
>- <url>http://scala-tools.org/repo-releases</url>
>- </repository>
>- <repository>
>- <id>scala-tools.org.snapshots</id>
>- <name>Scala-Tools Maven2 Repository for Snapshots</name>
>- <url>http://scala-tools.org/repo-snapshots</url>
>- <snapshots/>
>- </repository>
>- <repository>
> <id>typesafe</id>
> <name>Typesafe Repository Releases</name>
> <url>http://repo.typesafe.com/typesafe/releases</url>
>@@ -135,9 +124,9 @@
>
> <pluginRepositories>
> <pluginRepository>
>- <id>scala-tools.org</id>
>- <name>Scala-Tools Maven2 Repository</name>
>- <url>http://scala-tools.org/repo-releases</url>
>+ <id>typesafe</id>
>+ <name>Typesafe Repository Releases</name>
>+ <url>http://repo.typesafe.com/typesafe/releases</url>
> </pluginRepository>
> </pluginRepositories>
>
>@@ -232,7 +221,7 @@
> <dependency>
> <groupId>com.typesafe.akka</groupId>
> <artifactId>akka-camel</artifactId>
>- <version>2.1-SNAPSHOT</version>
>+ <version>${akka.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.camel</groupId>
>
>Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala (original)
>+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon Aug 20 11:16:34 2012
>@@ -27,6 +27,8 @@ import net.liftweb.http.auth._
> import net.liftweb.sitemap._
> import net.liftweb.sitemap.Loc._
> import Helpers._
>+import akka.actor.{Props => AkkaProps, ActorSystem}
>+
> //import TimeHelpers.intToTimeSpanBuilder
> //import net.liftweb.mapper.{DB, ConnectionManager, Schemifier, DefaultConnectionIdentifier, ConnectionIdentifier}
> import java.sql.{Connection, DriverManager}
>@@ -249,6 +251,8 @@ class Boot extends Loggable {
> ConvDistributor.touch
> // ScalaInterpreter.touch
>
>+ val sys = ActorSystem("camel")
>+ val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), "XmppSupervisor")
>
> // Initiating popular links and resent messages
> val resentPeriod = Props.getLong("stats.resent.period", 1 week)
>
>Modified: esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala (original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala Mon Aug 20 11:16:34 2012
>@@ -64,8 +64,8 @@ object UserActor {
> val xmppUsr = Props.get("xmpp.user") openOr ""
> val xmppPwd = Props.get("xmpp.password") openOr ""
> val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
>- lazy val sys = ActorSystem("camel")
>- lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)))
>+ val sys = ActorSystem("camel")
>+ val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
> }
>
>
>@@ -289,8 +289,14 @@ class UserActor extends LiftActor {
> Distributor !
> Distributor.AddMessageToMailbox(id, msg, ResendReason(userId))
>
>- case FetchFeed(_, _) =>
>- MessagePullActor ! MessagePullActor.Fetch(td.performId)
>+ case XmppFrom(_) => {
>+ val sys = ActorSystem("camel")
>+ sys.actorFor("XmppSupervisor") ! XmppSupervisor.Fetch(td.performId)
>+ }
>+
>+
>+ case FetchFeed(_, _) =>
>+ MessagePullActor ! MessagePullActor.Fetch(td.performId)
>
> case ScalaInterpret => logger.info("Scala interpreter is disabled!")
> /*if (msg.source.is != "scala")
>
>Added: esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala (added)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala Mon Aug 20 11:16:34 2012
>@@ -0,0 +1,51 @@
>+package org.apache.esme.actor
>+
>+import akka.camel.{CamelMessage, Consumer}
>+
>+import net.liftweb.common.{Empty, Logger}
>+import collection.immutable.Queue
>+import org.apache.esme.actor.Distributor.UserCreatedMessage
>+import org.apache.esme.model.User
>+
>+/**
>+ * Created with IntelliJ IDEA.
>+ * User: lester
>+ * Date: 17.08.12
>+ * Time: 2:26
>+ */
>+
>+object XmppReceiver {
>+ val logger: Logger = Logger("org.apache.esme.actor")
>+ case class FetchMessages()
>+}
>+
>+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String, esmePwd: String, xmppServiceName: String, participant: String, user: User) extends Consumer {
>+
>+ import XmppReceiver._
>+
>+ var messages: List[(String, Long)] = List.empty
>+
>+ def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI is: %s".format(uri)); uri}
>+
>+ def receive = {
>+ case msg: CamelMessage => {
>+ messages = (msg.bodyAs[String], System.currentTimeMillis) :: messages
>+ }
>+ case FetchMessages => {
>+ messages.foreach(message =>
>+ Distributor ! UserCreatedMessage(
>+ if (user != null) {user.id} else 0,
>+ message._1,
>+ Nil,
>+ message._2,
>+ Empty,
>+ participant,
>+ Empty,
>+ None
>+ )
>+ )
>+ messages = List.empty
>+ }
>+ case _ => logger.error("Incoming message is not Camel Message!")
>+ }
>+}
>\ No newline at end of file
>
>Added: esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala (added)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala Mon Aug 20 11:16:34 2012
>@@ -0,0 +1,66 @@
>+package org.apache.esme.actor
>+
>+import akka.actor.{ActorRef, Actor, Props => AkkaProps}
>+import net.liftweb.util.Props
>+import org.apache.esme.actor.XmppReceiver.FetchMessages
>+import org.apache.esme.model.User
>+import net.liftweb.common.Logger
>+
>+/**
>+ * Created with IntelliJ IDEA.
>+ * User: lester
>+ * Date: 17.08.12
>+ * Time: 4:21
>+ */
>+
>+object XmppSupervisor {
>+ val logger: Logger = Logger("org.apache.esme.actor")
>+
>+
>+ sealed trait XmppSupervisorActions
>+ case class Fetch(id: Long) extends XmppSupervisorActions
>+ case class Start(id: Long, who: String, usr: User) extends XmppSupervisorActions
>+ case class Stop(id: Long) extends XmppSupervisorActions
>+}
>+
>+class XmppSupervisor extends Actor {
>+
>+ import XmppSupervisor._
>+
>+ private var xmppPullActors: Map[Long, ActorRef] = Map.empty
>+
>+ var xmppHost: String = _
>+ var xmppPort: String = _
>+ var xmppUsr: String = _
>+ var xmppPwd: String = _
>+ var xmppServiceName: String = _
>+
>+
>+ override def preStart() {
>+ logger.info("preStart() called")
>+
>+ xmppHost = Props.get("xmpp.host") openOr ""
>+ xmppPort = Props.get("xmpp.port") openOr ""
>+ xmppUsr = Props.get("xmpp.user") openOr ""
>+ xmppPwd = Props.get("xmpp.password") openOr ""
>+ xmppServiceName = Props.get("xmpp.serviceName") openOr ""
>+ }
>+
>+ def receive = {
>+ case Start(id, who, usr) => {
>+ logger.info("Start message received. User: %s, who: %s".format(usr, who))
>+ xmppPullActors += (id -> context.actorOf(AkkaProps(new XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, who, usr))))
>+ }
>+ case Stop(id) => {
>+ xmppPullActors.get(id).foreach { ref =>
>+ context.stop(ref)
>+ xmppPullActors -= id
>+ }
>+ }
>+ case Fetch(id) => {
>+ logger.info("Fetch message received")
>+ xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
>+ }
>+ case _ => logger.info("Unknown message received")
>+ }
>+}
>
>Modified: esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala (original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala Mon Aug 20 11:16:34 2012
>@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers(
> lazy val password: Parser[String] = user
>
> lazy val mailtoUrl: Parser[String] = accept("mailto:") ~> emailAddr
>- lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
>+ lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
>+ lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr
>
> lazy val emailAddr: Parser[String] = rep1(xchar) ^^ {
> case xs => xs.mkString
>@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers(
> (mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> case mt ~ text => MailTo(mt, text.map(_ mkString))
> }) |
>- (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>+ (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> case mt ~ text => XmppTo(mt, text.map(_ mkString))
> }) |
>+ (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>+ case mt ~ text => XmppFrom(mt)
>+ }) |
> (scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ {
>- case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
>- HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
>- }) |
>+ case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
>+ HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
>+ }) |
> (acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ {
> case url ~ tags => FetchAtom(UrlStore.make(url), tags)
> }) |
>
>Modified: esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
>URL: http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala (original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala Mon Aug 20 11:16:34 2012
>@@ -34,7 +34,15 @@ import java.util.Calendar
> import java.util.Date
> import scala.xml.{Text, Node, Elem => XmlElem}
>
>-object Action extends Action with LongKeyedMetaMapper[Action] {
>+import akka.actor.{Props => AkkaProps, ActorSystem}
>+
>+object Action extends Action with LongKeyedMetaMapper[Action] with Logger {
>+
>+ val logger: Logger = Logger("org.apache.esme.model")
>+ val sys = ActorSystem("camel")
>+ val xmppSupervisor = sys.actorFor("XmppSupervisor")
>+
>+
> override def afterCommit = notifyDistributor _ :: super.afterCommit
>
> private def notifyDistributor(in: Action) {
>@@ -58,6 +66,7 @@ object Action extends Action with LongKe
> } else {
> SchedulerActor ! SchedulerActor.StopRegular(in.id)
> MessagePullActor ! MessagePullActor.StopPullActor(in.id)
>+ xmppSupervisor ! XmppSupervisor.Stop(in.id)
> }
> }
>
>@@ -185,6 +194,9 @@ object Action extends Action with LongKe
> */
> class Action extends LongKeyedMapper[Action] {
>
>+ import Action.xmppSupervisor
>+ import Action.logger
>+
> /**
> * Actors related to regularly executed actions are started here
> * This is done when the action is activated or at the start of the application
>@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act
> case FetchRss(_, _) => new RssFeed(u, url.url, urlSourcePrefix + url.uniqueId, 0, tags)
> }
> MessagePullActor ! MessagePullActor.StartPullActor(id.is, lastMsg, feed)
>-
>+
>+ case _ =>
>+ }
>+ }
>+ case XmppFrom(who) => {
>+ User.find(user) match {
>+ case Full(u) =>
>+ xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
> case _ =>
> }
> }
>@@ -569,6 +588,7 @@ sealed trait Performances
> case class MailTo(who: String, text: Option[String]) extends Performances
> case class HttpTo(url: String, user: String, password: String, headers: List[(String, String)], data: Option[String]) extends Performances
> case class XmppTo(who: String, text: Option[String]) extends Performances
>+case class XmppFrom(who: String) extends Performances
> case class FetchAtom(override val url: UrlStore, override val tags: List[String]) extends FetchFeed(url, tags)
> case class FetchRss(override val url: UrlStore, override val tags: List[String]) extends FetchFeed(url, tags)
> case object PerformResend extends Performances
>
>
Re: svn commit: r1374973 - in /esme/branches/akka: ./
src/main/scala/bootstrap/liftweb/ src/main/scala/org/apache/esme/actor/
src/main/scala/org/apache/esme/lib/ src/main/scala/org/apache/esme/model/
Posted by Vladimir Ivanov <le...@gmail.com>.
Done.
Sorry, comments were inserted due to recent upgrade of my IDE.
Best Regards,
Vladimir
2012/8/20 AJ Prudhomme <aj...@atlas-title.com>
> Remove!
>
> Sent from my Verizon Wireless Phone
>
> lester@apache.org wrote:
>
> >Author: lester
> >Date: Mon Aug 20 11:16:34 2012
> >New Revision: 1374973
> >
> >URL: http://svn.apache.org/viewvc?rev=1374973&view=rev
> >Log:
> >In scope of ESME-360: Began work on XMPP Consumer component action.
> >
> >Added:
> >
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> >
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
> >Modified:
> > esme/branches/akka/build.sbt
> > esme/branches/akka/pom.xml
> > esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
> >
> esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
> > esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> > esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
> >
> >Modified: esme/branches/akka/build.sbt
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/build.sbt (original)
> >+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012
> >@@ -8,7 +8,7 @@ version := "1.4"
> >
> > organization := "Apache Software Foundation"
> >
> >-scalaVersion := "2.9.1"
> >+scalaVersion := "2.9.1"
> >
> > //scalacOptions ++= Seq("-unchecked", "-deprecation")
> > scalacOptions ++= Seq("-deprecation")
> >@@ -28,7 +28,7 @@ libraryDependencies ++= {
> > val compassVersion = "2.1.1"
> > val luceneVersion = "2.4.0"
> > val scalazVersion = "6.0.4"
> >- val akkaVersion = "2.0.2"
> >+ val akkaVersion = "2.1-20120701-002745"
> > val eclipsejettyVersion = "7.3.1.v20110307"
> > val mortbayjettyVersion = "6.1.22"
> > val slf4jVersion = "1.6.4"
> >@@ -47,7 +47,7 @@ libraryDependencies ++= {
> > "net.liftweb" %% "lift-textile" % liftVersion % "compile->default",
> > "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
> > "com.typesafe.akka" % "akka-actor" % akkaVersion %
> "compile->default",
> >- "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" %
> "compile->default",
> >+ "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" %
> "compile->default",
> > "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
> > "javax.servlet" % "servlet-api" % "2.5" % "provided->default",
> > "org.compass-project" % "compass" % compassVersion %
> "compile->default",
> >
> >Modified: esme/branches/akka/pom.xml
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/pom.xml (original)
> >+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012
> >@@ -76,7 +76,7 @@
> > <lift.version>2.4</lift.version>
> > <scala.version>2.9.1</scala.version>
> > <scalaz.version>6.0.4</scalaz.version>
> >- <akka.version>2.1-SNAPSHOT</akka.version>
> >+ <akka.version>2.1-20120701-002745</akka.version>
> > <compass.version>2.1.1</compass.version>
> > <lucene.version>2.4.0</lucene.version>
> > <netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server>
> >@@ -91,17 +91,6 @@
> > <url>http://repo2.maven.org/maven2/</url>
> > </repository>
> > <repository>
> >- <id>scala-tools.org</id>
> >- <name>Scala-Tools Maven2 Repository</name>
> >- <url>http://scala-tools.org/repo-releases</url>
> >- </repository>
> >- <repository>
> >- <id>scala-tools.org.snapshots</id>
> >- <name>Scala-Tools Maven2 Repository for Snapshots</name>
> >- <url>http://scala-tools.org/repo-snapshots</url>
> >- <snapshots/>
> >- </repository>
> >- <repository>
> > <id>typesafe</id>
> > <name>Typesafe Repository Releases</name>
> > <url>http://repo.typesafe.com/typesafe/releases</url>
> >@@ -135,9 +124,9 @@
> >
> > <pluginRepositories>
> > <pluginRepository>
> >- <id>scala-tools.org</id>
> >- <name>Scala-Tools Maven2 Repository</name>
> >- <url>http://scala-tools.org/repo-releases</url>
> >+ <id>typesafe</id>
> >+ <name>Typesafe Repository Releases</name>
> >+ <url>http://repo.typesafe.com/typesafe/releases</url>
> > </pluginRepository>
> > </pluginRepositories>
> >
> >@@ -232,7 +221,7 @@
> > <dependency>
> > <groupId>com.typesafe.akka</groupId>
> > <artifactId>akka-camel</artifactId>
> >- <version>2.1-SNAPSHOT</version>
> >+ <version>${akka.version}</version>
> > </dependency>
> > <dependency>
> > <groupId>org.apache.camel</groupId>
> >
> >Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
> (original)
> >+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon
> Aug 20 11:16:34 2012
> >@@ -27,6 +27,8 @@ import net.liftweb.http.auth._
> > import net.liftweb.sitemap._
> > import net.liftweb.sitemap.Loc._
> > import Helpers._
> >+import akka.actor.{Props => AkkaProps, ActorSystem}
> >+
> > //import TimeHelpers.intToTimeSpanBuilder
> > //import net.liftweb.mapper.{DB, ConnectionManager, Schemifier,
> DefaultConnectionIdentifier, ConnectionIdentifier}
> > import java.sql.{Connection, DriverManager}
> >@@ -249,6 +251,8 @@ class Boot extends Loggable {
> > ConvDistributor.touch
> > // ScalaInterpreter.touch
> >
> >+ val sys = ActorSystem("camel")
> >+ val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()),
> "XmppSupervisor")
> >
> > // Initiating popular links and resent messages
> > val resentPeriod = Props.getLong("stats.resent.period", 1 week)
> >
> >Modified:
> esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >---
> esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
> (original)
> >+++
> esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala Mon
> Aug 20 11:16:34 2012
> >@@ -64,8 +64,8 @@ object UserActor {
> > val xmppUsr = Props.get("xmpp.user") openOr ""
> > val xmppPwd = Props.get("xmpp.password") openOr ""
> > val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
> >- lazy val sys = ActorSystem("camel")
> >- lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost,
> xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)))
> >+ val sys = ActorSystem("camel")
> >+ val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost,
> xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
> > }
> >
> >
> >@@ -289,8 +289,14 @@ class UserActor extends LiftActor {
> > Distributor !
> > Distributor.AddMessageToMailbox(id, msg,
> ResendReason(userId))
> >
> >- case FetchFeed(_, _) =>
> >- MessagePullActor ! MessagePullActor.Fetch(td.performId)
> >+ case XmppFrom(_) => {
> >+ val sys = ActorSystem("camel")
> >+ sys.actorFor("XmppSupervisor") !
> XmppSupervisor.Fetch(td.performId)
> >+ }
> >+
> >+
> >+ case FetchFeed(_, _) =>
> >+ MessagePullActor ! MessagePullActor.Fetch(td.performId)
> >
> > case ScalaInterpret => logger.info("Scala interpreter is
> disabled!")
> > /*if (msg.source.is != "scala")
> >
> >Added:
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto
>
> >==============================================================================
> >---
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> (added)
> >+++
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
> Mon Aug 20 11:16:34 2012
> >@@ -0,0 +1,51 @@
> >+package org.apache.esme.actor
> >+
> >+import akka.camel.{CamelMessage, Consumer}
> >+
> >+import net.liftweb.common.{Empty, Logger}
> >+import collection.immutable.Queue
> >+import org.apache.esme.actor.Distributor.UserCreatedMessage
> >+import org.apache.esme.model.User
> >+
> >+/**
> >+ * Created with IntelliJ IDEA.
> >+ * User: lester
> >+ * Date: 17.08.12
> >+ * Time: 2:26
> >+ */
> >+
> >+object XmppReceiver {
> >+ val logger: Logger = Logger("org.apache.esme.actor")
> >+ case class FetchMessages()
> >+}
> >+
> >+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String,
> esmePwd: String, xmppServiceName: String, participant: String, user: User)
> extends Consumer {
> >+
> >+ import XmppReceiver._
> >+
> >+ var messages: List[(String, Long)] = List.empty
> >+
> >+ def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format
> (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI
> is: %s".format(uri)); uri}
> >+
> >+ def receive = {
> >+ case msg: CamelMessage => {
> >+ messages = (msg.bodyAs[String], System.currentTimeMillis) ::
> messages
> >+ }
> >+ case FetchMessages => {
> >+ messages.foreach(message =>
> >+ Distributor ! UserCreatedMessage(
> >+ if (user != null) {user.id} else 0,
> >+ message._1,
> >+ Nil,
> >+ message._2,
> >+ Empty,
> >+ participant,
> >+ Empty,
> >+ None
> >+ )
> >+ )
> >+ messages = List.empty
> >+ }
> >+ case _ => logger.error("Incoming message is not Camel Message!")
> >+ }
> >+}
> >\ No newline at end of file
> >
> >Added:
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto
>
> >==============================================================================
> >---
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
> (added)
> >+++
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
> Mon Aug 20 11:16:34 2012
> >@@ -0,0 +1,66 @@
> >+package org.apache.esme.actor
> >+
> >+import akka.actor.{ActorRef, Actor, Props => AkkaProps}
> >+import net.liftweb.util.Props
> >+import org.apache.esme.actor.XmppReceiver.FetchMessages
> >+import org.apache.esme.model.User
> >+import net.liftweb.common.Logger
> >+
> >+/**
> >+ * Created with IntelliJ IDEA.
> >+ * User: lester
> >+ * Date: 17.08.12
> >+ * Time: 4:21
> >+ */
> >+
> >+object XmppSupervisor {
> >+ val logger: Logger = Logger("org.apache.esme.actor")
> >+
> >+
> >+ sealed trait XmppSupervisorActions
> >+ case class Fetch(id: Long) extends XmppSupervisorActions
> >+ case class Start(id: Long, who: String, usr: User) extends
> XmppSupervisorActions
> >+ case class Stop(id: Long) extends XmppSupervisorActions
> >+}
> >+
> >+class XmppSupervisor extends Actor {
> >+
> >+ import XmppSupervisor._
> >+
> >+ private var xmppPullActors: Map[Long, ActorRef] = Map.empty
> >+
> >+ var xmppHost: String = _
> >+ var xmppPort: String = _
> >+ var xmppUsr: String = _
> >+ var xmppPwd: String = _
> >+ var xmppServiceName: String = _
> >+
> >+
> >+ override def preStart() {
> >+ logger.info("preStart() called")
> >+
> >+ xmppHost = Props.get("xmpp.host") openOr ""
> >+ xmppPort = Props.get("xmpp.port") openOr ""
> >+ xmppUsr = Props.get("xmpp.user") openOr ""
> >+ xmppPwd = Props.get("xmpp.password") openOr ""
> >+ xmppServiceName = Props.get("xmpp.serviceName") openOr ""
> >+ }
> >+
> >+ def receive = {
> >+ case Start(id, who, usr) => {
> >+ logger.info("Start message received. User: %s, who:
> %s".format(usr, who))
> >+ xmppPullActors += (id -> context.actorOf(AkkaProps(new
> XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName,
> who, usr))))
> >+ }
> >+ case Stop(id) => {
> >+ xmppPullActors.get(id).foreach { ref =>
> >+ context.stop(ref)
> >+ xmppPullActors -= id
> >+ }
> >+ }
> >+ case Fetch(id) => {
> >+ logger.info("Fetch message received")
> >+ xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
> >+ }
> >+ case _ => logger.info("Unknown message received")
> >+ }
> >+}
> >
> >Modified:
> esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> (original)
> >+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
> Mon Aug 20 11:16:34 2012
> >@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers(
> > lazy val password: Parser[String] = user
> >
> > lazy val mailtoUrl: Parser[String] = accept("mailto:") ~> emailAddr
> >- lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
> >+ lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
> >+ lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr
> >
> > lazy val emailAddr: Parser[String] = rep1(xchar) ^^ {
> > case xs => xs.mkString
> >@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers(
> > (mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> > case mt ~ text => MailTo(mt, text.map(_ mkString))
> > }) |
> >- (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> >+ (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> > case mt ~ text => XmppTo(mt, text.map(_ mkString))
> > }) |
> >+ (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
> >+ case mt ~ text => XmppFrom(mt)
> >+ }) |
> > (scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ {
> >- case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
> >- HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
> >- }) |
> >+ case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
> >+ HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
> >+ }) |
> > (acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ {
> > case url ~ tags => FetchAtom(UrlStore.make(url), tags)
> > }) |
> >
> >Modified:
> esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
> >URL:
> http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>
> >==============================================================================
> >--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
> (original)
> >+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
> Mon Aug 20 11:16:34 2012
> >@@ -34,7 +34,15 @@ import java.util.Calendar
> > import java.util.Date
> > import scala.xml.{Text, Node, Elem => XmlElem}
> >
> >-object Action extends Action with LongKeyedMetaMapper[Action] {
> >+import akka.actor.{Props => AkkaProps, ActorSystem}
> >+
> >+object Action extends Action with LongKeyedMetaMapper[Action] with
> Logger {
> >+
> >+ val logger: Logger = Logger("org.apache.esme.model")
> >+ val sys = ActorSystem("camel")
> >+ val xmppSupervisor = sys.actorFor("XmppSupervisor")
> >+
> >+
> > override def afterCommit = notifyDistributor _ :: super.afterCommit
> >
> > private def notifyDistributor(in: Action) {
> >@@ -58,6 +66,7 @@ object Action extends Action with LongKe
> > } else {
> > SchedulerActor ! SchedulerActor.StopRegular(in.id)
> > MessagePullActor ! MessagePullActor.StopPullActor(in.id)
> >+ xmppSupervisor ! XmppSupervisor.Stop(in.id)
> > }
> > }
> >
> >@@ -185,6 +194,9 @@ object Action extends Action with LongKe
> > */
> > class Action extends LongKeyedMapper[Action] {
> >
> >+ import Action.xmppSupervisor
> >+ import Action.logger
> >+
> > /**
> > * Actors related to regularly executed actions are started here
> > * This is done when the action is activated or at the start of the
> application
> >@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act
> > case FetchRss(_, _) => new RssFeed(u, url.url,
> urlSourcePrefix + url.uniqueId, 0, tags)
> > }
> > MessagePullActor ! MessagePullActor.StartPullActor(id.is,
> lastMsg, feed)
> >-
> >+
> >+ case _ =>
> >+ }
> >+ }
> >+ case XmppFrom(who) => {
> >+ User.find(user) match {
> >+ case Full(u) =>
> >+ xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
> > case _ =>
> > }
> > }
> >@@ -569,6 +588,7 @@ sealed trait Performances
> > case class MailTo(who: String, text: Option[String]) extends Performances
> > case class HttpTo(url: String, user: String, password: String, headers:
> List[(String, String)], data: Option[String]) extends Performances
> > case class XmppTo(who: String, text: Option[String]) extends Performances
> >+case class XmppFrom(who: String) extends Performances
> > case class FetchAtom(override val url: UrlStore, override val tags:
> List[String]) extends FetchFeed(url, tags)
> > case class FetchRss(override val url: UrlStore, override val tags:
> List[String]) extends FetchFeed(url, tags)
> > case object PerformResend extends Performances
> >
> >
>
--
Best Regards,
Vladimir Ivanov